mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-16 15:01:23 +01:00
1c802de626
Allow configuring the set of workers to proxy outbound federation traffic through (`outbound_federation_restricted_to`). This is useful when you have a worker setup with `federation_sender` instances responsible for sending outbound federation requests and want to make sure *all* outbound federation traffic goes through those instances. Before this change, the generic workers would still contact federation themselves for things like profile lookups, backfill, etc. This PR allows you to set more strict access controls/firewall for all workers and only allow the `federation_sender`'s to contact the outside world.
1154 lines
41 KiB
Python
1154 lines
41 KiB
Python
# Copyright 2016 OpenMarket Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import Optional, cast
|
|
from unittest.mock import Mock, call
|
|
|
|
from parameterized import parameterized
|
|
from signedjson.key import generate_signing_key
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
from synapse.api.constants import EventTypes, Membership, PresenceState
|
|
from synapse.api.presence import UserPresenceState
|
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
|
from synapse.events.builder import EventBuilder
|
|
from synapse.federation.sender import FederationSender
|
|
from synapse.handlers.presence import (
|
|
EXTERNAL_PROCESS_EXPIRY,
|
|
FEDERATION_PING_INTERVAL,
|
|
FEDERATION_TIMEOUT,
|
|
IDLE_TIMER,
|
|
LAST_ACTIVE_GRANULARITY,
|
|
SYNC_ONLINE_TIMEOUT,
|
|
handle_timeout,
|
|
handle_update,
|
|
)
|
|
from synapse.rest import admin
|
|
from synapse.rest.client import room
|
|
from synapse.server import HomeServer
|
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
|
from synapse.util import Clock
|
|
|
|
from tests import unittest
|
|
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
|
|
|
|
|
class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
|
servlets = [admin.register_servlets]
|
|
|
|
def prepare(
|
|
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
|
) -> None:
|
|
self.store = homeserver.get_datastores().main
|
|
|
|
def test_offline_to_online(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
new_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now
|
|
)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertTrue(persist_and_notify)
|
|
self.assertTrue(state.currently_active)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 3)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
|
|
),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
|
|
),
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_online_to_online(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now
|
|
)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertFalse(persist_and_notify)
|
|
self.assertTrue(federation_ping)
|
|
self.assertTrue(state.currently_active)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 3)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
|
|
),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
|
|
),
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_online_to_online_last_active_noop(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
|
|
currently_active=True,
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now
|
|
)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertFalse(persist_and_notify)
|
|
self.assertTrue(federation_ping)
|
|
self.assertTrue(state.currently_active)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 3)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
|
|
),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
|
|
),
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_online_to_online_last_active(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
|
|
currently_active=True,
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertTrue(persist_and_notify)
|
|
self.assertFalse(state.currently_active)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 2)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
|
|
),
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_remote_ping_timer(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertFalse(persist_and_notify)
|
|
self.assertFalse(federation_ping)
|
|
self.assertFalse(state.currently_active)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 1)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
|
|
)
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_online_to_offline(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertTrue(persist_and_notify)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 0)
|
|
|
|
def test_online_to_idle(self) -> None:
|
|
wheel_timer = Mock()
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
prev_state = UserPresenceState.default(user_id)
|
|
prev_state = prev_state.copy_and_replace(
|
|
state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
|
|
)
|
|
|
|
new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
|
|
|
|
state, persist_and_notify, federation_ping = handle_update(
|
|
prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
|
|
)
|
|
|
|
self.assertTrue(persist_and_notify)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(state.last_federation_update_ts, now)
|
|
self.assertEqual(new_state.state, state.state)
|
|
self.assertEqual(new_state.status_msg, state.status_msg)
|
|
|
|
self.assertEqual(wheel_timer.insert.call_count, 1)
|
|
wheel_timer.insert.assert_has_calls(
|
|
[
|
|
call(
|
|
now=now,
|
|
obj=user_id,
|
|
then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
|
|
)
|
|
],
|
|
any_order=True,
|
|
)
|
|
|
|
def test_persisting_presence_updates(self) -> None:
|
|
"""Tests that the latest presence state for each user is persisted correctly"""
|
|
# Create some test users and presence states for them
|
|
presence_states = []
|
|
for i in range(5):
|
|
user_id = self.register_user(f"user_{i}", "password")
|
|
|
|
presence_state = UserPresenceState(
|
|
user_id=user_id,
|
|
state="online",
|
|
last_active_ts=1,
|
|
last_federation_update_ts=1,
|
|
last_user_sync_ts=1,
|
|
status_msg="I'm online!",
|
|
currently_active=True,
|
|
)
|
|
presence_states.append(presence_state)
|
|
|
|
# Persist these presence updates to the database
|
|
self.get_success(self.store.update_presence(presence_states))
|
|
|
|
# Check that each update is present in the database
|
|
db_presence_states_raw = self.get_success(
|
|
self.store.get_all_presence_updates(
|
|
instance_name="master",
|
|
last_id=0,
|
|
current_id=len(presence_states) + 1,
|
|
limit=len(presence_states),
|
|
)
|
|
)
|
|
|
|
# Extract presence update user ID and state information into lists of tuples
|
|
db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states_raw[0]]
|
|
presence_states_compare = [(ps.user_id, ps.state) for ps in presence_states]
|
|
|
|
# Compare what we put into the storage with what we got out.
|
|
# They should be identical.
|
|
self.assertEqual(presence_states_compare, db_presence_states)
|
|
|
|
|
|
class PresenceTimeoutTestCase(unittest.TestCase):
|
|
"""Tests different timers and that the timer does not change `status_msg` of user."""
|
|
|
|
def test_idle_timer(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now - IDLE_TIMER - 1,
|
|
last_user_sync_ts=now,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
assert new_state is not None
|
|
self.assertEqual(new_state.state, PresenceState.UNAVAILABLE)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
def test_busy_no_idle(self) -> None:
|
|
"""
|
|
Tests that a user setting their presence to busy but idling doesn't turn their
|
|
presence state into unavailable.
|
|
"""
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.BUSY,
|
|
last_active_ts=now - IDLE_TIMER - 1,
|
|
last_user_sync_ts=now,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
assert new_state is not None
|
|
self.assertEqual(new_state.state, PresenceState.BUSY)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
def test_sync_timeout(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=0,
|
|
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
assert new_state is not None
|
|
self.assertEqual(new_state.state, PresenceState.OFFLINE)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
def test_sync_online(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
|
|
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(
|
|
state, is_mine=True, syncing_user_ids={user_id}, now=now
|
|
)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
assert new_state is not None
|
|
self.assertEqual(new_state.state, PresenceState.ONLINE)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
def test_federation_ping(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now,
|
|
last_user_sync_ts=now,
|
|
last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
self.assertEqual(state, new_state)
|
|
|
|
def test_no_timeout(self) -> None:
|
|
user_id = "@foo:bar"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now,
|
|
last_user_sync_ts=now,
|
|
last_federation_update_ts=now,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNone(new_state)
|
|
|
|
def test_federation_timeout(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now,
|
|
last_user_sync_ts=now,
|
|
last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(
|
|
state, is_mine=False, syncing_user_ids=set(), now=now
|
|
)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
assert new_state is not None
|
|
self.assertEqual(new_state.state, PresenceState.OFFLINE)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
def test_last_active(self) -> None:
|
|
user_id = "@foo:bar"
|
|
status_msg = "I'm here!"
|
|
now = 5000000
|
|
|
|
state = UserPresenceState.default(user_id)
|
|
state = state.copy_and_replace(
|
|
state=PresenceState.ONLINE,
|
|
last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
|
|
last_user_sync_ts=now,
|
|
last_federation_update_ts=now,
|
|
status_msg=status_msg,
|
|
)
|
|
|
|
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
|
|
|
|
self.assertIsNotNone(new_state)
|
|
self.assertEqual(state, new_state)
|
|
|
|
|
|
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.presence_handler = hs.get_presence_handler()
|
|
self.clock = hs.get_clock()
|
|
|
|
def test_external_process_timeout(self) -> None:
|
|
"""Test that if an external process doesn't update the records for a while
|
|
we time out their syncing users presence.
|
|
"""
|
|
process_id = "1"
|
|
user_id = "@test:server"
|
|
|
|
# Notify handler that a user is now syncing.
|
|
self.get_success(
|
|
self.presence_handler.update_external_syncs_row(
|
|
process_id, user_id, True, self.clock.time_msec()
|
|
)
|
|
)
|
|
|
|
# Check that if we wait a while without telling the handler the user has
|
|
# stopped syncing that their presence state doesn't get timed out.
|
|
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
|
|
|
# Check that if the external process timeout fires, then the syncing
|
|
# user gets timed out
|
|
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
|
|
|
def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
|
|
"""Test that if a user doesn't update the records for a while
|
|
users presence goes `OFFLINE` because of timeout and `status_msg` remains.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
# Mark user as online
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.ONLINE, status_msg
|
|
)
|
|
|
|
# Check that if we wait a while without telling the handler the user has
|
|
# stopped syncing that their presence state doesn't get timed out.
|
|
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
|
self.assertEqual(state.status_msg, status_msg)
|
|
|
|
# Check that if the timeout fires, then the syncing user gets timed out
|
|
self.reactor.advance(SYNC_ONLINE_TIMEOUT)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# status_msg should remain even after going offline
|
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
|
self.assertEqual(state.status_msg, status_msg)
|
|
|
|
def test_user_goes_offline_manually_with_no_status_msg(self) -> None:
|
|
"""Test that if a user change presence manually to `OFFLINE`
|
|
and no status is set, that `status_msg` is `None`.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
# Mark user as online
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.ONLINE, status_msg
|
|
)
|
|
|
|
# Mark user as offline
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string(user_id), {"presence": PresenceState.OFFLINE}
|
|
)
|
|
)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
|
self.assertEqual(state.status_msg, None)
|
|
|
|
def test_user_goes_offline_manually_with_status_msg(self) -> None:
|
|
"""Test that if a user change presence manually to `OFFLINE`
|
|
and a status is set, that `status_msg` appears.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
# Mark user as online
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.ONLINE, status_msg
|
|
)
|
|
|
|
# Mark user as offline
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.OFFLINE, "And now here."
|
|
)
|
|
|
|
def test_user_reset_online_with_no_status(self) -> None:
|
|
"""Test that if a user set again the presence manually
|
|
and no status is set, that `status_msg` is `None`.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
# Mark user as online
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.ONLINE, status_msg
|
|
)
|
|
|
|
# Mark user as online again
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string(user_id), {"presence": PresenceState.ONLINE}
|
|
)
|
|
)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# status_msg should remain even after going offline
|
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
|
self.assertEqual(state.status_msg, None)
|
|
|
|
def test_set_presence_with_status_msg_none(self) -> None:
|
|
"""Test that if a user set again the presence manually
|
|
and status is `None`, that `status_msg` is `None`.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
# Mark user as online
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.ONLINE, status_msg
|
|
)
|
|
|
|
# Mark user as online and `status_msg = None`
|
|
self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None)
|
|
|
|
def test_set_presence_from_syncing_not_set(self) -> None:
|
|
"""Test that presence is not set by syncing if affect_presence is false"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.UNAVAILABLE, status_msg
|
|
)
|
|
|
|
self.get_success(
|
|
self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE)
|
|
)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# we should still be unavailable
|
|
self.assertEqual(state.state, PresenceState.UNAVAILABLE)
|
|
# and status message should still be the same
|
|
self.assertEqual(state.status_msg, status_msg)
|
|
|
|
def test_set_presence_from_syncing_is_set(self) -> None:
|
|
"""Test that presence is set by syncing if affect_presence is true"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.UNAVAILABLE, status_msg
|
|
)
|
|
|
|
self.get_success(
|
|
self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
|
|
)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# we should now be online
|
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
|
|
|
def test_set_presence_from_syncing_keeps_status(self) -> None:
|
|
"""Test that presence set by syncing retains status message"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm here!"
|
|
|
|
self._set_presencestate_with_status_msg(
|
|
user_id, PresenceState.UNAVAILABLE, status_msg
|
|
)
|
|
|
|
self.get_success(
|
|
self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
|
|
)
|
|
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# our status message should be the same as it was before
|
|
self.assertEqual(state.status_msg, status_msg)
|
|
|
|
@parameterized.expand([(False,), (True,)])
|
|
@unittest.override_config(
|
|
{
|
|
"experimental_features": {
|
|
"msc3026_enabled": True,
|
|
},
|
|
}
|
|
)
|
|
def test_set_presence_from_syncing_keeps_busy(
|
|
self, test_with_workers: bool
|
|
) -> None:
|
|
"""Test that presence set by syncing doesn't affect busy status
|
|
|
|
Args:
|
|
test_with_workers: If True, check the presence state of the user by calling
|
|
/sync against a worker, rather than the main process.
|
|
"""
|
|
user_id = "@test:server"
|
|
status_msg = "I'm busy!"
|
|
|
|
# By default, we call /sync against the main process.
|
|
worker_to_sync_against = self.hs
|
|
if test_with_workers:
|
|
# Create a worker and use it to handle /sync traffic instead.
|
|
# This is used to test that presence changes get replicated from workers
|
|
# to the main process correctly.
|
|
worker_to_sync_against = self.make_worker_hs(
|
|
"synapse.app.generic_worker", {"worker_name": "presence_writer"}
|
|
)
|
|
|
|
# Set presence to BUSY
|
|
self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg)
|
|
|
|
# Perform a sync with a presence state other than busy. This should NOT change
|
|
# our presence status; we only change from busy if we explicitly set it via
|
|
# /presence/*.
|
|
self.get_success(
|
|
worker_to_sync_against.get_presence_handler().user_syncing(
|
|
user_id, True, PresenceState.ONLINE
|
|
)
|
|
)
|
|
|
|
# Check against the main process that the user's presence did not change.
|
|
state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
# we should still be busy
|
|
self.assertEqual(state.state, PresenceState.BUSY)
|
|
|
|
def _set_presencestate_with_status_msg(
|
|
self, user_id: str, state: str, status_msg: Optional[str]
|
|
) -> None:
|
|
"""Set a PresenceState and status_msg and check the result.
|
|
|
|
Args:
|
|
user_id: User for that the status is to be set.
|
|
state: The new PresenceState.
|
|
status_msg: Status message that is to be set.
|
|
"""
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string(user_id),
|
|
{"presence": state, "status_msg": status_msg},
|
|
)
|
|
)
|
|
|
|
new_state = self.get_success(
|
|
self.presence_handler.get_state(UserID.from_string(user_id))
|
|
)
|
|
self.assertEqual(new_state.state, state)
|
|
self.assertEqual(new_state.status_msg, status_msg)
|
|
|
|
|
|
class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.presence_handler = hs.get_presence_handler()
|
|
self.clock = hs.get_clock()
|
|
self.instance_name = hs.get_instance_name()
|
|
|
|
self.queue = self.presence_handler.get_federation_queue()
|
|
|
|
def test_send_and_get(self) -> None:
|
|
state1 = UserPresenceState.default("@user1:test")
|
|
state2 = UserPresenceState.default("@user2:test")
|
|
state3 = UserPresenceState.default("@user3:test")
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(1, ("dest1", "@user1:test")),
|
|
(1, ("dest2", "@user1:test")),
|
|
(1, ("dest1", "@user2:test")),
|
|
(1, ("dest2", "@user2:test")),
|
|
(2, ("dest3", "@user3:test")),
|
|
]
|
|
|
|
self.assertCountEqual(rows, expected_rows)
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", upto_token, now_token, 10)
|
|
)
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
self.assertCountEqual(rows, [])
|
|
|
|
def test_send_and_get_split(self) -> None:
|
|
state1 = UserPresenceState.default("@user1:test")
|
|
state2 = UserPresenceState.default("@user2:test")
|
|
state3 = UserPresenceState.default("@user3:test")
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(1, ("dest1", "@user1:test")),
|
|
(1, ("dest2", "@user1:test")),
|
|
(1, ("dest1", "@user2:test")),
|
|
(1, ("dest2", "@user2:test")),
|
|
]
|
|
|
|
self.assertCountEqual(rows, expected_rows)
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", upto_token, now_token, 10)
|
|
)
|
|
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(2, ("dest3", "@user3:test")),
|
|
]
|
|
|
|
self.assertCountEqual(rows, expected_rows)
|
|
|
|
def test_clear_queue_all(self) -> None:
|
|
state1 = UserPresenceState.default("@user1:test")
|
|
state2 = UserPresenceState.default("@user2:test")
|
|
state3 = UserPresenceState.default("@user3:test")
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
self.reactor.advance(10 * 60 * 1000)
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
self.assertCountEqual(rows, [])
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(3, ("dest1", "@user1:test")),
|
|
(3, ("dest2", "@user1:test")),
|
|
(3, ("dest1", "@user2:test")),
|
|
(3, ("dest2", "@user2:test")),
|
|
(4, ("dest3", "@user3:test")),
|
|
]
|
|
|
|
self.assertCountEqual(rows, expected_rows)
|
|
|
|
def test_partially_clear_queue(self) -> None:
|
|
state1 = UserPresenceState.default("@user1:test")
|
|
state2 = UserPresenceState.default("@user2:test")
|
|
state3 = UserPresenceState.default("@user3:test")
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
|
|
self.reactor.advance(2 * 60 * 1000)
|
|
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
self.reactor.advance(4 * 60 * 1000)
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(2, ("dest3", "@user3:test")),
|
|
]
|
|
self.assertCountEqual(rows, [])
|
|
|
|
prev_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
|
|
self.queue.send_presence_to_destinations((state3,), ("dest3",))
|
|
|
|
now_token = self.queue.get_current_token(self.instance_name)
|
|
|
|
rows, upto_token, limited = self.get_success(
|
|
self.queue.get_replication_rows("master", prev_token, now_token, 10)
|
|
)
|
|
self.assertEqual(upto_token, now_token)
|
|
self.assertFalse(limited)
|
|
|
|
expected_rows = [
|
|
(3, ("dest1", "@user1:test")),
|
|
(3, ("dest2", "@user1:test")),
|
|
(3, ("dest1", "@user2:test")),
|
|
(3, ("dest2", "@user2:test")),
|
|
(4, ("dest3", "@user3:test")),
|
|
]
|
|
|
|
self.assertCountEqual(rows, expected_rows)
|
|
|
|
|
|
class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|
"""Tests remote servers get told about presence of users in the room when
|
|
they join and when new local users join.
|
|
"""
|
|
|
|
user_id = "@test:server"
|
|
|
|
servlets = [room.register_servlets]
|
|
|
|
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
|
hs = self.setup_test_homeserver(
|
|
"server",
|
|
federation_sender=Mock(spec=FederationSender),
|
|
)
|
|
return hs
|
|
|
|
def default_config(self) -> JsonDict:
|
|
config = super().default_config()
|
|
# Enable federation sending on the main process.
|
|
config["federation_sender_instances"] = None
|
|
return config
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.federation_sender = cast(Mock, hs.get_federation_sender())
|
|
self.event_builder_factory = hs.get_event_builder_factory()
|
|
self.federation_event_handler = hs.get_federation_event_handler()
|
|
self.presence_handler = hs.get_presence_handler()
|
|
|
|
# self.event_builder_for_2 = EventBuilderFactory(hs)
|
|
# self.event_builder_for_2.hostname = "test2"
|
|
|
|
self.store = hs.get_datastores().main
|
|
self.state = hs.get_state_handler()
|
|
self._event_auth_handler = hs.get_event_auth_handler()
|
|
|
|
# We don't actually check signatures in tests, so lets just create a
|
|
# random key to use.
|
|
self.random_signing_key = generate_signing_key("ver")
|
|
|
|
def test_remote_joins(self) -> None:
|
|
# We advance time to something that isn't 0, as we use 0 as a special
|
|
# value.
|
|
self.reactor.advance(1000000000000)
|
|
|
|
# Create a room with two local users
|
|
room_id = self.helper.create_room_as(self.user_id)
|
|
self.helper.join(room_id, "@test2:server")
|
|
|
|
# Mark test2 as online, test will be offline with a last_active of 0
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
|
|
)
|
|
)
|
|
self.reactor.pump([0]) # Wait for presence updates to be handled
|
|
|
|
#
|
|
# Test that a new server gets told about existing presence
|
|
#
|
|
|
|
self.federation_sender.reset_mock()
|
|
|
|
# Add a new remote server to the room
|
|
self._add_new_user(room_id, "@alice:server2")
|
|
|
|
# When new server is joined we send it the local users presence states.
|
|
# We expect to only see user @test2:server, as @test:server is offline
|
|
# and has a zero last_active_ts
|
|
expected_state = self.get_success(
|
|
self.presence_handler.current_state_for_user("@test2:server")
|
|
)
|
|
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
|
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
|
destinations={"server2"}, states=[expected_state]
|
|
)
|
|
|
|
#
|
|
# Test that only the new server gets sent presence and not existing servers
|
|
#
|
|
|
|
self.federation_sender.reset_mock()
|
|
self._add_new_user(room_id, "@bob:server3")
|
|
|
|
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
|
destinations={"server3"}, states=[expected_state]
|
|
)
|
|
|
|
def test_remote_gets_presence_when_local_user_joins(self) -> None:
|
|
# We advance time to something that isn't 0, as we use 0 as a special
|
|
# value.
|
|
self.reactor.advance(1000000000000)
|
|
|
|
# Create a room with one local users
|
|
room_id = self.helper.create_room_as(self.user_id)
|
|
|
|
# Mark test as online
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}
|
|
)
|
|
)
|
|
|
|
# Mark test2 as online, test will be offline with a last_active of 0.
|
|
# Note we don't join them to the room yet
|
|
self.get_success(
|
|
self.presence_handler.set_state(
|
|
UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
|
|
)
|
|
)
|
|
|
|
# Add servers to the room
|
|
self._add_new_user(room_id, "@alice:server2")
|
|
self._add_new_user(room_id, "@bob:server3")
|
|
|
|
self.reactor.pump([0]) # Wait for presence updates to be handled
|
|
|
|
#
|
|
# Test that when a local join happens remote servers get told about it
|
|
#
|
|
|
|
self.federation_sender.reset_mock()
|
|
|
|
# Join local user to room
|
|
self.helper.join(room_id, "@test2:server")
|
|
|
|
self.reactor.pump([0]) # Wait for presence updates to be handled
|
|
|
|
# We expect to only send test2 presence to server2 and server3
|
|
expected_state = self.get_success(
|
|
self.presence_handler.current_state_for_user("@test2:server")
|
|
)
|
|
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
|
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
|
destinations={"server2", "server3"}, states=[expected_state]
|
|
)
|
|
|
|
def _add_new_user(self, room_id: str, user_id: str) -> None:
|
|
"""Add new user to the room by creating an event and poking the federation API."""
|
|
|
|
hostname = get_domain_from_id(user_id)
|
|
|
|
room_version = self.get_success(self.store.get_room_version_id(room_id))
|
|
|
|
builder = EventBuilder(
|
|
state=self.state,
|
|
event_auth_handler=self._event_auth_handler,
|
|
store=self.store,
|
|
clock=self.clock,
|
|
hostname=hostname,
|
|
signing_key=self.random_signing_key,
|
|
room_version=KNOWN_ROOM_VERSIONS[room_version],
|
|
room_id=room_id,
|
|
type=EventTypes.Member,
|
|
sender=user_id,
|
|
state_key=user_id,
|
|
content={"membership": Membership.JOIN},
|
|
)
|
|
|
|
prev_event_ids = self.get_success(
|
|
self.store.get_latest_event_ids_in_room(room_id)
|
|
)
|
|
|
|
event = self.get_success(
|
|
builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
|
|
)
|
|
|
|
self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
|
|
|
|
# Check that it was successfully persisted.
|
|
self.get_success(self.store.get_event(event.event_id))
|
|
self.get_success(self.store.get_event(event.event_id))
|