0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 11:13:49 +01:00

Merge pull request #3680 from matrix-org/neilj/server_notices_on_blocking

server notices on resource limit blocking
This commit is contained in:
Erik Johnston 2018-08-22 17:18:28 +01:00 committed by GitHub
commit a5806aba27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 408 additions and 32 deletions

1
changelog.d/3680.feature Normal file
View file

@ -0,0 +1 @@
Server notices for resource limit blocking

View file

@ -783,10 +783,16 @@ class Auth(object):
user_id(str|None): If present, checks for presence against existing user_id(str|None): If present, checks for presence against existing
MAU cohort MAU cohort
""" """
# Never fail an auth check for the server notices users
# This can be a problem where event creation is prohibited due to blocking
if user_id == self.hs.config.server_notices_mxid:
return
if self.hs.config.hs_disabled: if self.hs.config.hs_disabled:
raise ResourceLimitError( raise ResourceLimitError(
403, self.hs.config.hs_disabled_message, 403, self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEED, errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_uri=self.hs.config.admin_uri, admin_uri=self.hs.config.admin_uri,
limit_type=self.hs.config.hs_disabled_limit_type limit_type=self.hs.config.hs_disabled_limit_type
) )
@ -803,6 +809,6 @@ class Auth(object):
403, "Monthly Active User Limit Exceeded", 403, "Monthly Active User Limit Exceeded",
admin_uri=self.hs.config.admin_uri, admin_uri=self.hs.config.admin_uri,
errcode=Codes.RESOURCE_LIMIT_EXCEED, errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type="monthly_active_user" limit_type="monthly_active_user"
) )

View file

@ -78,6 +78,7 @@ class EventTypes(object):
Name = "m.room.name" Name = "m.room.name"
ServerACL = "m.room.server_acl" ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events"
class RejectedReason(object): class RejectedReason(object):
@ -108,3 +109,6 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested # vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2. # until we have a working v2.
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST} KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"

View file

@ -56,7 +56,7 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED" RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
@ -238,7 +238,7 @@ class ResourceLimitError(SynapseError):
""" """
def __init__( def __init__(
self, code, msg, self, code, msg,
errcode=Codes.RESOURCE_LIMIT_EXCEED, errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_uri=None, admin_uri=None,
limit_type=None, limit_type=None,
): ):

View file

@ -0,0 +1,191 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six import iteritems
from twisted.internet import defer
from synapse.api.constants import (
EventTypes,
ServerNoticeLimitReached,
ServerNoticeMsgType,
)
from synapse.api.errors import AuthError, ResourceLimitError, SynapseError
from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG
logger = logging.getLogger(__name__)
class ResourceLimitsServerNotices(object):
""" Keeps track of whether the server has reached it's resource limit and
ensures that the client is kept up to date.
"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
self._server_notices_manager = hs.get_server_notices_manager()
self._store = hs.get_datastore()
self._auth = hs.get_auth()
self._config = hs.config
self._resouce_limited = False
self._message_handler = hs.get_message_handler()
self._state = hs.get_state_handler()
@defer.inlineCallbacks
def maybe_send_server_notice_to_user(self, user_id):
"""Check if we need to send a notice to this user, this will be true in
two cases.
1. The server has reached its limit does not reflect this
2. The room state indicates that the server has reached its limit when
actually the server is fine
Args:
user_id (str): user to check
Returns:
Deferred
"""
if self._config.hs_disabled is True:
return
if self._config.limit_usage_by_mau is False:
return
timestamp = yield self._store.user_last_seen_monthly_active(user_id)
if timestamp is None:
# This user will be blocked from receiving the notice anyway.
# In practice, not sure we can ever get here
return
# Determine current state of room
room_id = yield self._server_notices_manager.get_notice_room_for_user(user_id)
yield self._check_and_set_tags(user_id, room_id)
currently_blocked, ref_events = yield self._is_room_currently_blocked(room_id)
try:
# Normally should always pass in user_id if you have it, but in
# this case are checking what would happen to other users if they
# were to arrive.
try:
yield self._auth.check_auth_blocking()
is_auth_blocking = False
except ResourceLimitError as e:
is_auth_blocking = True
event_content = e.msg
event_limit_type = e.limit_type
if currently_blocked and not is_auth_blocking:
# Room is notifying of a block, when it ought not to be.
# Remove block notification
content = {
"pinned": ref_events
}
yield self._server_notices_manager.send_notice(
user_id, content, EventTypes.Pinned, '',
)
elif not currently_blocked and is_auth_blocking:
# Room is not notifying of a block, when it ought to be.
# Add block notification
content = {
'body': event_content,
'msgtype': ServerNoticeMsgType,
'server_notice_type': ServerNoticeLimitReached,
'admin_uri': self._config.admin_uri,
'limit_type': event_limit_type
}
event = yield self._server_notices_manager.send_notice(
user_id, content, EventTypes.Message,
)
content = {
"pinned": [
event.event_id,
]
}
yield self._server_notices_manager.send_notice(
user_id, content, EventTypes.Pinned, '',
)
except SynapseError as e:
logger.error("Error sending resource limits server notice: %s", e)
@defer.inlineCallbacks
def _check_and_set_tags(self, user_id, room_id):
"""
Since server notices rooms were originally not with tags,
important to check that tags have been set correctly
Args:
user_id(str): the user in question
room_id(str): the server notices room for that user
"""
tags = yield self._store.get_tags_for_user(user_id)
server_notices_tags = tags.get(room_id)
need_to_set_tag = True
if server_notices_tags:
if server_notices_tags.get(SERVER_NOTICE_ROOM_TAG):
# tag already present, nothing to do here
need_to_set_tag = False
if need_to_set_tag:
yield self._store.add_tag_to_room(
user_id, room_id, SERVER_NOTICE_ROOM_TAG, None
)
@defer.inlineCallbacks
def _is_room_currently_blocked(self, room_id):
"""
Determines if the room is currently blocked
Args:
room_id(str): The room id of the server notices room
Returns:
bool: Is the room currently blocked
list: The list of pinned events that are unrelated to limit blocking
This list can be used as a convenience in the case where the block
is to be lifted and the remaining pinned event references need to be
preserved
"""
currently_blocked = False
pinned_state_event = None
try:
pinned_state_event = yield self._state.get_current_state(
room_id, event_type=EventTypes.Pinned
)
except AuthError:
# The user has yet to join the server notices room
pass
referenced_events = []
if pinned_state_event is not None:
referenced_events = pinned_state_event.content.get('pinned')
events = yield self._store.get_events(referenced_events)
for event_id, event in iteritems(events):
if event.type != EventTypes.Message:
continue
if event.content.get("msgtype") == ServerNoticeMsgType:
currently_blocked = True
# remove event in case we need to disable blocking later on.
if event_id in referenced_events:
referenced_events.remove(event.event_id)
defer.returnValue((currently_blocked, referenced_events))

View file

@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SERVER_NOTICE_ROOM_TAG = "m.server_notice"
class ServerNoticesManager(object): class ServerNoticesManager(object):
def __init__(self, hs): def __init__(self, hs):
@ -46,7 +48,10 @@ class ServerNoticesManager(object):
return self._config.server_notices_mxid is not None return self._config.server_notices_mxid is not None
@defer.inlineCallbacks @defer.inlineCallbacks
def send_notice(self, user_id, event_content): def send_notice(
self, user_id, event_content,
type=EventTypes.Message, state_key=None
):
"""Send a notice to the given user """Send a notice to the given user
Creates the server notices room, if none exists. Creates the server notices room, if none exists.
@ -54,9 +59,11 @@ class ServerNoticesManager(object):
Args: Args:
user_id (str): mxid of user to send event to. user_id (str): mxid of user to send event to.
event_content (dict): content of event to send event_content (dict): content of event to send
type(EventTypes): type of event
is_state_event(bool): Is the event a state event
Returns: Returns:
Deferred[None] Deferred[FrozenEvent]
""" """
room_id = yield self.get_notice_room_for_user(user_id) room_id = yield self.get_notice_room_for_user(user_id)
@ -65,15 +72,20 @@ class ServerNoticesManager(object):
logger.info("Sending server notice to %s", user_id) logger.info("Sending server notice to %s", user_id)
yield self._event_creation_handler.create_and_send_nonmember_event( event_dict = {
requester, { "type": type,
"type": EventTypes.Message, "room_id": room_id,
"room_id": room_id, "sender": system_mxid,
"sender": system_mxid, "content": event_content,
"content": event_content, }
},
ratelimit=False, if state_key is not None:
event_dict['state_key'] = state_key
res = yield self._event_creation_handler.create_and_send_nonmember_event(
requester, event_dict, ratelimit=False,
) )
defer.returnValue(res)
@cachedInlineCallbacks() @cachedInlineCallbacks()
def get_notice_room_for_user(self, user_id): def get_notice_room_for_user(self, user_id):
@ -141,6 +153,9 @@ class ServerNoticesManager(object):
creator_join_profile=join_profile, creator_join_profile=join_profile,
) )
room_id = info['room_id'] room_id = info['room_id']
yield self._store.add_tag_to_room(
user_id, room_id, SERVER_NOTICE_ROOM_TAG, None
)
logger.info("Created server notices room %s for %s", room_id, user_id) logger.info("Created server notices room %s for %s", room_id, user_id)
defer.returnValue(room_id) defer.returnValue(room_id)

View file

@ -12,7 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer
from synapse.server_notices.consent_server_notices import ConsentServerNotices from synapse.server_notices.consent_server_notices import ConsentServerNotices
from synapse.server_notices.resource_limits_server_notices import (
ResourceLimitsServerNotices,
)
class ServerNoticesSender(object): class ServerNoticesSender(object):
@ -25,34 +30,34 @@ class ServerNoticesSender(object):
Args: Args:
hs (synapse.server.HomeServer): hs (synapse.server.HomeServer):
""" """
# todo: it would be nice to make this more dynamic self._server_notices = (
self._consent_server_notices = ConsentServerNotices(hs) ConsentServerNotices(hs),
ResourceLimitsServerNotices(hs)
)
@defer.inlineCallbacks
def on_user_syncing(self, user_id): def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation. """Called when the user performs a sync operation.
Args: Args:
user_id (str): mxid of user who synced user_id (str): mxid of user who synced
Returns:
Deferred
""" """
return self._consent_server_notices.maybe_send_server_notice_to_user( for sn in self._server_notices:
user_id, yield sn.maybe_send_server_notice_to_user(
) user_id,
)
@defer.inlineCallbacks
def on_user_ip(self, user_id): def on_user_ip(self, user_id):
"""Called on the master when a worker process saw a client request. """Called on the master when a worker process saw a client request.
Args: Args:
user_id (str): mxid user_id (str): mxid
Returns:
Deferred
""" """
# The synchrotrons use a stubbed version of ServerNoticesSender, so # The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as # we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing # in on_user_syncing
return self._consent_server_notices.maybe_send_server_notice_to_user( for sn in self._server_notices:
user_id, yield sn.maybe_send_server_notice_to_user(
) user_id,
)

View file

@ -458,7 +458,7 @@ class AuthTestCase(unittest.TestCase):
with self.assertRaises(ResourceLimitError) as e: with self.assertRaises(ResourceLimitError) as e:
yield self.auth.check_auth_blocking() yield self.auth.check_auth_blocking()
self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri) self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
self.assertEquals(e.exception.code, 403) self.assertEquals(e.exception.code, 403)
# Ensure does not throw an error # Ensure does not throw an error
@ -474,5 +474,13 @@ class AuthTestCase(unittest.TestCase):
with self.assertRaises(ResourceLimitError) as e: with self.assertRaises(ResourceLimitError) as e:
yield self.auth.check_auth_blocking() yield self.auth.check_auth_blocking()
self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri) self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
self.assertEquals(e.exception.code, 403) self.assertEquals(e.exception.code, 403)
@defer.inlineCallbacks
def test_server_notices_mxid_special_cased(self):
self.hs.config.hs_disabled = True
user = "@user:server"
self.hs.config.server_notices_mxid = user
self.hs.config.hs_disabled_message = "Reason for being disabled"
yield self.auth.check_auth_blocking(user)

View file

@ -51,7 +51,7 @@ class SyncTestCase(tests.unittest.TestCase):
self.hs.config.hs_disabled = True self.hs.config.hs_disabled = True
with self.assertRaises(ResourceLimitError) as e: with self.assertRaises(ResourceLimitError) as e:
yield self.sync_handler.wait_for_sync_for_user(sync_config) yield self.sync_handler.wait_for_sync_for_user(sync_config)
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
self.hs.config.hs_disabled = False self.hs.config.hs_disabled = False
@ -59,7 +59,7 @@ class SyncTestCase(tests.unittest.TestCase):
with self.assertRaises(ResourceLimitError) as e: with self.assertRaises(ResourceLimitError) as e:
yield self.sync_handler.wait_for_sync_for_user(sync_config) yield self.sync_handler.wait_for_sync_for_user(sync_config)
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
def _generate_sync_config(self, user_id): def _generate_sync_config(self, user_id):
return SyncConfig( return SyncConfig(

View file

@ -0,0 +1,146 @@
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, ServerNoticeMsgType
from synapse.api.errors import ResourceLimitError
from synapse.handlers.auth import AuthHandler
from synapse.server_notices.resource_limits_server_notices import (
ResourceLimitsServerNotices,
)
from tests import unittest
from tests.utils import setup_test_homeserver
class AuthHandlers(object):
def __init__(self, hs):
self.auth_handler = AuthHandler(hs)
class TestResourceLimitsServerNotices(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.hs = yield setup_test_homeserver(self.addCleanup, handlers=None)
self.hs.handlers = AuthHandlers(self.hs)
self.auth_handler = self.hs.handlers.auth_handler
self.server_notices_sender = self.hs.get_server_notices_sender()
# relying on [1] is far from ideal, but the only case where
# ResourceLimitsServerNotices class needs to be isolated is this test,
# general code should never have a reason to do so ...
self._rlsn = self.server_notices_sender._server_notices[1]
if not isinstance(self._rlsn, ResourceLimitsServerNotices):
raise Exception("Failed to find reference to ResourceLimitsServerNotices")
self._rlsn._store.user_last_seen_monthly_active = Mock(
return_value=defer.succeed(1000)
)
self._send_notice = self._rlsn._server_notices_manager.send_notice
self._rlsn._server_notices_manager.send_notice = Mock()
self._rlsn._state.get_current_state = Mock(return_value=defer.succeed(None))
self._rlsn._store.get_events = Mock(return_value=defer.succeed({}))
self._send_notice = self._rlsn._server_notices_manager.send_notice
self.hs.config.limit_usage_by_mau = True
self.user_id = "@user_id:test"
# self.server_notices_mxid = "@server:test"
# self.server_notices_mxid_display_name = None
# self.server_notices_mxid_avatar_url = None
# self.server_notices_room_name = "Server Notices"
self._rlsn._server_notices_manager.get_notice_room_for_user = Mock(
returnValue=""
)
self._rlsn._store.add_tag_to_room = Mock()
self.hs.config.admin_uri = "mailto:user@test.com"
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_flag_off(self):
"""Tests cases where the flags indicate nothing to do"""
# test hs disabled case
self.hs.config.hs_disabled = True
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
self._send_notice.assert_not_called()
# Test when mau limiting disabled
self.hs.config.hs_disabled = False
self.hs.limit_usage_by_mau = False
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
self._send_notice.assert_not_called()
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_remove_blocked_notice(self):
"""Test when user has blocked notice, but should have it removed"""
self._rlsn._auth.check_auth_blocking = Mock()
mock_event = Mock(
type=EventTypes.Message,
content={"msgtype": ServerNoticeMsgType},
)
self._rlsn._store.get_events = Mock(return_value=defer.succeed(
{"123": mock_event}
))
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
# Would be better to check the content, but once == remove blocking event
self._send_notice.assert_called_once()
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_remove_blocked_notice_noop(self):
"""Test when user has blocked notice, but notice ought to be there (NOOP)"""
self._rlsn._auth.check_auth_blocking = Mock(
side_effect=ResourceLimitError(403, 'foo')
)
mock_event = Mock(
type=EventTypes.Message,
content={"msgtype": ServerNoticeMsgType},
)
self._rlsn._store.get_events = Mock(return_value=defer.succeed(
{"123": mock_event}
))
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
self._send_notice.assert_not_called()
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_add_blocked_notice(self):
"""Test when user does not have blocked notice, but should have one"""
self._rlsn._auth.check_auth_blocking = Mock(
side_effect=ResourceLimitError(403, 'foo')
)
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
# Would be better to check contents, but 2 calls == set blocking event
self.assertTrue(self._send_notice.call_count == 2)
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_add_blocked_notice_noop(self):
"""Test when user does not have blocked notice, nor should they (NOOP)"""
self._rlsn._auth.check_auth_blocking = Mock()
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
self._send_notice.assert_not_called()
@defer.inlineCallbacks
def test_maybe_send_server_notice_to_user_not_in_mau_cohort(self):
"""Test when user is not part of the MAU cohort - this should not ever
happen - but ...
"""
self._rlsn._auth.check_auth_blocking = Mock()
self._rlsn._store.user_last_seen_monthly_active = Mock(
return_value=defer.succeed(None)
)
yield self._rlsn.maybe_send_server_notice_to_user(self.user_id)
self._send_notice.assert_not_called()