From 2ccf3b241c3f066fce29fc5cdf095ed3c32e9bcc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Mar 2017 18:13:59 +0000 Subject: [PATCH 1/4] Implement no op for room stream in sync --- synapse/handlers/sync.py | 51 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 33b7fdfe8..470082ed2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import RoomStreamToken from twisted.internet import defer @@ -765,6 +766,19 @@ class SyncHandler(object): ) sync_result_builder.now_token = now_token + since_token = sync_result_builder.since_token + if not sync_result_builder.full_state: + if since_token and not ephemeral_by_room and not account_data_by_room: + have_changed = yield self._have_rooms_changed(sync_result_builder) + if not have_changed: + tags_by_room = yield self.store.get_updated_tags( + user_id, + since_token.account_data_key, + ) + if not tags_by_room: + logger.info("no-oping sync") + defer.returnValue(([], [])) + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, ) @@ -774,13 +788,12 @@ class SyncHandler(object): else: ignored_users = frozenset() - if sync_result_builder.since_token: + if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res tags_by_room = yield self.store.get_updated_tags( - user_id, - sync_result_builder.since_token.account_data_key, + user_id, since_token.account_data_key, ) else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) @@ -805,7 +818,7 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() - if sync_result_builder.since_token: + if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( joined_sync.timeline.events, joined_sync.state.values() @@ -817,6 +830,36 @@ class SyncHandler(object): defer.returnValue((newly_joined_rooms, newly_joined_users)) + @defer.inlineCallbacks + def _have_rooms_changed(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + + # assert since_token + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + if rooms_changed: + defer.returnValue(True) + + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + strema_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream + for room_id in joined_room_ids: + if self.store.has_room_changed_since(room_id, strema_id): + defer.returnValue(True) + defer.returnValue(False) + @defer.inlineCallbacks def _get_rooms_changed(self, sync_result_builder, ignored_users): """Gets the the changes that have happened since the last sync. From 6957bfdca6658114526033e839ceec38b988a323 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Mar 2017 11:51:46 +0000 Subject: [PATCH 2/4] Don't recreate so many sets --- synapse/app/synchrotron.py | 3 +-- synapse/handlers/device.py | 14 ++++++-------- synapse/handlers/presence.py | 17 +++++++++-------- synapse/handlers/profile.py | 8 ++++---- synapse/handlers/receipts.py | 5 ++--- synapse/handlers/sync.py | 18 +++++++----------- synapse/notifier.py | 6 ++---- synapse/push/push_tools.py | 8 ++++---- synapse/rest/client/v1/room.py | 3 +-- synapse/storage/roommember.py | 11 ++++++----- 10 files changed, 42 insertions(+), 51 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 29f075aa5..449fac771 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -399,8 +399,7 @@ class SynchrotronServer(HomeServer): position = row[position_index] user_id = row[user_index] - rooms = yield store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] + room_ids = yield store.get_rooms_for_user(user_id) notifier.on_new_event( "device_list_key", position, rooms=room_ids, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1b007d494..c22f65ce5 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -248,8 +248,7 @@ class DeviceHandler(BaseHandler): user_id, device_ids, list(hosts) ) - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = [r.room_id for r in rooms] + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( "device_list_key", position, rooms=room_ids, @@ -270,8 +269,7 @@ class DeviceHandler(BaseHandler): user_id (str) from_token (StreamToken) """ - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(r.room_id for r in rooms) + room_ids = yield self.store.get_rooms_for_user(user_id) # First we check if any devices have changed changed = yield self.store.get_user_whose_devices_changed( @@ -347,8 +345,8 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def user_left_room(self, user, room_id): user_id = user.to_string() - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: # We no longer share rooms with this user, so we'll no longer # receive device updates. Mark this in DB. yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) @@ -404,8 +402,8 @@ class DeviceListEduUpdater(object): logger.warning("Got device list update edu for %r from %r", user_id, origin) return - rooms = yield self.store.get_rooms_for_user(user_id) - if not rooms: + room_ids = yield self.store.get_rooms_for_user(user_id) + if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. return diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e562a2e87..059260a8a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -557,9 +557,9 @@ class PresenceHandler(object): room_ids_to_states = {} users_to_states = {} for state in states: - events = yield self.store.get_rooms_for_user(state.user_id) - for e in events: - room_ids_to_states.setdefault(e.room_id, []).append(state) + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) plist = yield self.store.get_presence_list_observers_accepted(state.user_id) for u in plist: @@ -913,11 +913,12 @@ class PresenceHandler(object): def is_visible(self, observed_user, observer_user): """Returns whether a user can see another user's presence. """ - observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string()) - observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string()) - - observer_room_ids = set(r.room_id for r in observer_rooms) - observed_room_ids = set(r.room_id for r in observed_rooms) + observer_room_ids = yield self.store.get_rooms_for_user( + observer_user.to_string() + ) + observed_room_ids = yield self.store.get_rooms_for_user( + observed_user.to_string() + ) if observer_room_ids & observed_room_ids: defer.returnValue(True) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 87f74dfb8..abd1fb28c 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,11 +156,11 @@ class ProfileHandler(BaseHandler): self.ratelimit(requester) - joins = yield self.store.get_rooms_for_user( + room_ids = yield self.store.get_rooms_for_user( user.to_string(), ) - for j in joins: + for room_id in room_ids: handler = self.hs.get_handlers().room_member_handler try: # Assume the user isn't a guest because we don't let guests set @@ -171,12 +171,12 @@ class ProfileHandler(BaseHandler): yield handler.update_membership( requester, user, - j.room_id, + room_id, "join", # We treat a profile update like a join. ratelimit=False, # Try to hide that these events aren't atomic. ) except Exception as e: logger.warn( "Failed to update join event for room %s - %s", - j.room_id, str(e.message) + room_id, str(e.message) ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 50aa51393..e1cd3a48e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -210,10 +210,9 @@ class ReceiptEventSource(object): else: from_key = None - rooms = yield self.store.get_rooms_for_user(user.to_string()) - rooms = [room.room_id for room in rooms] + room_ids = yield self.store.get_rooms_for_user(user.to_string()) events = yield self.store.get_linearized_receipts_for_rooms( - rooms, + room_ids, from_key=from_key, to_key=to_key, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 470082ed2..e17fb3073 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -226,8 +226,7 @@ class SyncHandler(object): with Measure(self.clock, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else "0" - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] + room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( @@ -569,16 +568,15 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(r.room_id for r in rooms) + room_ids = yield self.store.get_rooms_for_user(user_id) user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) for other_user_id in changed: - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): + other_room_ids = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(other_room_ids): user_ids_changed.add(other_user_id) defer.returnValue(user_ids_changed) @@ -776,7 +774,7 @@ class SyncHandler(object): since_token.account_data_key, ) if not tags_by_room: - logger.info("no-oping sync") + logger.debug("no-oping sync") defer.returnValue(([], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( @@ -851,8 +849,7 @@ class SyncHandler(object): rooms = yield self.store.get_app_service_rooms(app_service) joined_room_ids = set(r.room_id for r in rooms) else: - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) + joined_room_ids = yield self.store.get_rooms_for_user(user_id) strema_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream for room_id in joined_room_ids: @@ -884,8 +881,7 @@ class SyncHandler(object): rooms = yield self.store.get_app_service_rooms(app_service) joined_room_ids = set(r.room_id for r in rooms) else: - rooms = yield self.store.get_rooms_for_user(user_id) - joined_room_ids = set(r.room_id for r in rooms) + joined_room_ids = yield self.store.get_rooms_for_user(user_id) # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( diff --git a/synapse/notifier.py b/synapse/notifier.py index 31f723d94..7eeba6d28 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -304,8 +304,7 @@ class Notifier(object): if user_stream is None: current_token = yield self.event_sources.get_current_token() if room_ids is None: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = [room.room_id for room in rooms] + room_ids = yield self.store.get_rooms_for_user(user_id) user_stream = _NotifierUserStream( user_id=user_id, rooms=room_ids, @@ -454,8 +453,7 @@ class Notifier(object): @defer.inlineCallbacks def _get_room_ids(self, user, explicit_room_id): - joined_rooms = yield self.store.get_rooms_for_user(user.to_string()) - joined_room_ids = map(lambda r: r.room_id, joined_rooms) + joined_room_ids = yield self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: defer.returnValue(([explicit_room_id], True)) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index a27476bba..287df94b4 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -33,13 +33,13 @@ def get_badge_count(store, user_id): badge = len(invites) - for r in joins: - if r.room_id in my_receipts_by_room: - last_unread_event_id = my_receipts_by_room[r.room_id] + for room_id in joins: + if room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[room_id] notifs = yield ( store.get_unread_event_push_actions_by_room_for_user( - r.room_id, user_id, last_unread_event_id + room_id, user_id, last_unread_event_id ) ) # return one badge count per conversation, as count per diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 90242a6ba..0bdd6b5b3 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -748,8 +748,7 @@ class JoinedRoomsRestServlet(ClientV1RestServlet): def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) - rooms = yield self.store.get_rooms_for_user(requester.user.to_string()) - room_ids = set(r.room_id for r in rooms) # Ensure they're unique. + room_ids = yield self.store.get_rooms_for_user(requester.user.to_string()) defer.returnValue((200, {"joined_rooms": list(room_ids)})) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 545d3d3a9..5f044a3f1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -274,24 +274,25 @@ class RoomMemberStore(SQLBaseStore): return rows - @cached(max_entries=500000, iterable=True) + @cachedInlineCallbacks(max_entries=500000, iterable=True) def get_rooms_for_user(self, user_id): - return self.get_rooms_for_user_where_membership_is( + rooms = yield self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], ) + defer.returnValue(frozenset(r.room_id for r in rooms)) @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True) def get_users_who_share_room_with_user(self, user_id, cache_context): """Returns the set of users who share a room with `user_id` """ - rooms = yield self.get_rooms_for_user( + room_ids = yield self.get_rooms_for_user( user_id, on_invalidate=cache_context.invalidate, ) user_who_share_room = set() - for room in rooms: + for room_id in room_ids: user_ids = yield self.get_users_in_room( - room.room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate, ) user_who_share_room.update(user_ids) From a158c36a8a739226347e56b87047a97507442dff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Mar 2017 11:56:59 +0000 Subject: [PATCH 3/4] Comment --- synapse/handlers/sync.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e17fb3073..3dbedb2c4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -764,6 +764,8 @@ class SyncHandler(object): ) sync_result_builder.now_token = now_token + # We check up front if anything has changed, if it hasn't then there is + # no point in going futher. since_token = sync_result_builder.since_token if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: @@ -830,11 +832,14 @@ class SyncHandler(object): @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): + """Returns whether any rooms have changed since the sync. Must be an + incremental sync + """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token - # assert since_token + assert since_token # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( @@ -851,9 +856,9 @@ class SyncHandler(object): else: joined_room_ids = yield self.store.get_rooms_for_user(user_id) - strema_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream + stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream for room_id in joined_room_ids: - if self.store.has_room_changed_since(room_id, strema_id): + if self.store.has_room_changed_since(room_id, stream_id): defer.returnValue(True) defer.returnValue(False) From da146657c9a8685eb927cd295891332af86ba15d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Mar 2017 13:04:07 +0000 Subject: [PATCH 4/4] Comments --- synapse/handlers/sync.py | 4 ++-- synapse/storage/roommember.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3dbedb2c4..c0205da1a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -832,8 +832,8 @@ class SyncHandler(object): @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): - """Returns whether any rooms have changed since the sync. Must be an - incremental sync + """Returns whether there may be any new events that should be sent down + the sync. Returns True if there are. """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 5f044a3f1..e38d8927b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -276,6 +276,8 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks(max_entries=500000, iterable=True) def get_rooms_for_user(self, user_id): + """Returns a set of room_ids the user is currently joined to + """ rooms = yield self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], )