diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index beb5aa3a6..ae4735898 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -156,7 +156,7 @@ class PresenceHandler(BaseHandler): defer.returnValue(True) if (yield self.store.user_rooms_intersect( - [observer_user, observed_user] + [u.to_string() for u in observer_user, observed_user] )): defer.returnValue(True) @@ -772,15 +772,52 @@ class PresenceEventSource(object): self.hs = hs self.clock = hs.get_clock() + @defer.inlineCallbacks + def is_visible(self, observer_user, observed_user): + if observer_user == observed_user: + defer.returnValue(True) + + presence = self.hs.get_handlers().presence_handler + + if (yield presence.store.user_rooms_intersect( + [u.to_string() for u in observer_user, observed_user] + )): + defer.returnValue(True) + + if observed_user.is_mine: + pushmap = presence._local_pushmap + + defer.returnValue( + observed_user.localpart in pushmap and + observer_user in pushmap[observed_user.localpart] + ) + else: + recvmap = presence._remote_recvmap + + defer.returnValue( + observed_user in recvmap and + observer_user in recvmap[observed_user] + ) + + @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) + observer_user = user + presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap - # TODO(paul): limit, and filter by visibility - updates = [(k, cachemap[k]) for k in cachemap - if from_key < cachemap[k].serial] + updates = [] + # TODO(paul): use a DeferredList ? How to limit concurrency. + for observed_user in cachemap.keys(): + if not (from_key < cachemap[observed_user].serial): + continue + + if (yield self.is_visible(observer_user, observed_user)): + updates.append((observed_user, cachemap[observed_user])) + + # TODO(paul): limit if updates: clock = self.clock @@ -788,14 +825,15 @@ class PresenceEventSource(object): latest_serial = max([x[1].serial for x in updates]) data = [x[1].make_event(user=x[0], clock=clock) for x in updates] - return ((data, latest_serial)) + defer.returnValue((data, latest_serial)) else: - return (([], presence._user_cachemap_latest_serial)) + defer.returnValue(([], presence._user_cachemap_latest_serial)) def get_current_key(self): presence = self.hs.get_handlers().presence_handler return presence._user_cachemap_latest_serial + @defer.inlineCallbacks def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? @@ -812,7 +850,17 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap - # TODO(paul): limit, and filter by visibility + updates = [] + # TODO(paul): use a DeferredList ? How to limit concurrency. + for observed_user in cachemap.keys(): + if not (to_key < cachemap[observed_user].serial < from_key): + continue + + if (yield self.is_visible(observer_user, observed_user)): + updates.append((observed_user, cachemap[observed_user])) + + # TODO(paul): limit + updates = [(k, cachemap[k]) for k in cachemap if to_key < cachemap[k].serial < from_key] @@ -830,13 +878,13 @@ class PresenceEventSource(object): next_token = next_token.copy_and_replace( "presence_key", earliest_serial ) - return ((data, next_token)) + defer.returnValue((data, next_token)) else: if not to_token: to_token = from_token.copy_and_replace( "presence_key", 0 ) - return (([], to_token)) + defer.returnValue(([], to_token)) class UserPresenceCache(object): diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 4583ff8bc..8506961d2 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -118,7 +118,9 @@ class PresenceStateTestCase(unittest.TestCase): room_member_handler.get_room_members = get_room_members def user_rooms_intersect(userlist): - shared = all(map(lambda u: u in self.room_members, userlist)) + room_member_ids = map(lambda u: u.to_string(), self.room_members) + + shared = all(map(lambda i: i in room_member_ids, userlist)) return defer.succeed(shared) self.datastore.user_rooms_intersect = user_rooms_intersect @@ -562,6 +564,13 @@ class PresencePushTestCase(unittest.TestCase): return defer.succeed([]) self.datastore.get_joined_hosts_for_room = get_room_hosts + def user_rooms_intersect(userlist): + room_member_ids = map(lambda u: u.to_string(), self.room_members) + + shared = all(map(lambda i: i in room_member_ids, userlist)) + return defer.succeed(shared) + self.datastore.user_rooms_intersect = user_rooms_intersect + @defer.inlineCallbacks def fetch_room_distributions_into(room_id, localusers=None, remotedomains=None, ignore_user=None): @@ -604,6 +613,7 @@ class PresencePushTestCase(unittest.TestCase): self.u_apple = hs.parse_userid("@apple:test") self.u_banana = hs.parse_userid("@banana:test") self.u_clementine = hs.parse_userid("@clementine:test") + self.u_durian = hs.parse_userid("@durian:test") self.u_elderberry = hs.parse_userid("@elderberry:test") # Remote user @@ -632,6 +642,7 @@ class PresencePushTestCase(unittest.TestCase): {"presence": ONLINE} ) + # Apple sees self-reflection (events, _) = yield self.event_source.get_new_events_for_user( self.u_apple, 0, None ) @@ -647,6 +658,55 @@ class PresencePushTestCase(unittest.TestCase): "last_active_ago": 0, }}, ], + msg="Presence event should be visible to self-reflection" + ) + + # Banana sees it because of presence subscription + (events, _) = yield self.event_source.get_new_events_for_user( + self.u_banana, 0, None + ) + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals(events, + [ + {"type": "m.presence", + "content": { + "user_id": "@apple:test", + "presence": ONLINE, + "state": ONLINE, + "last_active_ago": 0, + }}, + ], + msg="Presence event should be visible to explicit subscribers" + ) + + # Elderberry sees it because of same room + (events, _) = yield self.event_source.get_new_events_for_user( + self.u_elderberry, 0, None + ) + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals(events, + [ + {"type": "m.presence", + "content": { + "user_id": "@apple:test", + "presence": ONLINE, + "state": ONLINE, + "last_active_ago": 0, + }}, + ], + msg="Presence event should be visible to other room members" + ) + + # Durian is not in the room, should not see this event + (events, _) = yield self.event_source.get_new_events_for_user( + self.u_durian, 0, None + ) + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals(events, [], + msg="Presence event should not be visible to others" ) presence = yield self.handler.get_presence_list( @@ -664,6 +724,10 @@ class PresencePushTestCase(unittest.TestCase): presence ) + # TODO(paul): Gut-wrenching + banana_set = self.handler._local_pushmap.setdefault("banana", set()) + banana_set.add(self.u_apple) + yield self.handler.set_state(self.u_banana, self.u_banana, {"presence": ONLINE} ) @@ -825,6 +889,8 @@ class PresencePushTestCase(unittest.TestCase): "a-room" ) + self.room_members.append(self.u_clementine) + (events, _) = yield self.event_source.get_new_events_for_user( self.u_apple, 0, None ) diff --git a/tests/rest/test_presence.py b/tests/rest/test_presence.py index e2cdd80e0..df8dd7415 100644 --- a/tests/rest/test_presence.py +++ b/tests/rest/test_presence.py @@ -269,11 +269,16 @@ class PresenceEventStreamTestCase(unittest.TestCase): hs.register_servlets() - hs.handlers.room_member_handler = Mock(spec=[ - "get_rooms_for_user", - ]) - hs.handlers.room_member_handler.get_rooms_for_user = ( - lambda u: defer.succeed([])) + hs.handlers.room_member_handler = Mock(spec=[]) + + self.room_members = [] + + def get_rooms_for_user(user): + if user in self.room_members: + return ["a-room"] + else: + return [] + hs.handlers.room_member_handler.get_rooms_for_user = get_rooms_for_user self.mock_datastore = hs.get_datastore() @@ -285,6 +290,17 @@ class PresenceEventStreamTestCase(unittest.TestCase): return defer.succeed(None) self.mock_datastore.get_profile_avatar_url = get_profile_avatar_url + def user_rooms_intersect(user_list): + room_member_ids = map(lambda u: u.to_string(), self.room_members) + + shared = all(map(lambda i: i in room_member_ids, user_list)) + return defer.succeed(shared) + self.mock_datastore.user_rooms_intersect = user_rooms_intersect + + def get_joined_hosts_for_room(room_id): + return [] + self.mock_datastore.get_joined_hosts_for_room = get_joined_hosts_for_room + self.presence = hs.get_handlers().presence_handler self.u_apple = hs.parse_userid("@apple:test") @@ -292,6 +308,8 @@ class PresenceEventStreamTestCase(unittest.TestCase): @defer.inlineCallbacks def test_shortpoll(self): + self.room_members = [self.u_apple, self.u_banana] + self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE}) self.mock_datastore.get_presence_list.return_value = defer.succeed(