forked from MirrorHub/synapse
a4c3a361b7
* Rate-limiting for registration * Add unit test for registration rate limiting * Add config parameters for rate limiting on auth endpoints * Doc * Fix doc of rate limiting function Co-Authored-By: babolivier <contact@brendanabolivier.com> * Incorporate review * Fix config parsing * Fix linting errors * Set default config for auth rate limiting * Fix tests * Add changelog * Advance reactor instead of mocked clock * Move parameters to registration specific config and give them more sensible default values * Remove unused config options * Don't mock the rate limiter un MAU tests * Rename _register_with_store into register_with_store * Make CI happy * Remove unused import * Update sample config * Fix ratelimiting test for py2 * Add non-guest test
180 lines
6.6 KiB
Python
180 lines
6.6 KiB
Python
import json
|
|
|
|
from synapse.api.constants import LoginType
|
|
from synapse.appservice import ApplicationService
|
|
from synapse.rest.client.v2_alpha.register import register_servlets
|
|
|
|
from tests import unittest
|
|
|
|
|
|
class RegisterRestServletTestCase(unittest.HomeserverTestCase):
|
|
|
|
servlets = [register_servlets]
|
|
|
|
def make_homeserver(self, reactor, clock):
|
|
|
|
self.url = b"/_matrix/client/r0/register"
|
|
|
|
self.hs = self.setup_test_homeserver()
|
|
self.hs.config.enable_registration = True
|
|
self.hs.config.registrations_require_3pid = []
|
|
self.hs.config.auto_join_rooms = []
|
|
self.hs.config.enable_registration_captcha = False
|
|
|
|
return self.hs
|
|
|
|
def test_POST_appservice_registration_valid(self):
|
|
user_id = "@as_user_kermit:test"
|
|
as_token = "i_am_an_app_service"
|
|
|
|
appservice = ApplicationService(
|
|
as_token, self.hs.config.hostname,
|
|
id="1234",
|
|
namespaces={
|
|
"users": [{"regex": r"@as_user.*", "exclusive": True}],
|
|
},
|
|
)
|
|
|
|
self.hs.get_datastore().services_cache.append(appservice)
|
|
request_data = json.dumps({"username": "as_user_kermit"})
|
|
|
|
request, channel = self.make_request(
|
|
b"POST", self.url + b"?access_token=i_am_an_app_service", request_data
|
|
)
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
det_data = {
|
|
"user_id": user_id,
|
|
"home_server": self.hs.hostname,
|
|
}
|
|
self.assertDictContainsSubset(det_data, channel.json_body)
|
|
|
|
def test_POST_appservice_registration_invalid(self):
|
|
self.appservice = None # no application service exists
|
|
request_data = json.dumps({"username": "kermit"})
|
|
request, channel = self.make_request(
|
|
b"POST", self.url + b"?access_token=i_am_an_app_service", request_data
|
|
)
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"401", channel.result)
|
|
|
|
def test_POST_bad_password(self):
|
|
request_data = json.dumps({"username": "kermit", "password": 666})
|
|
request, channel = self.make_request(b"POST", self.url, request_data)
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"400", channel.result)
|
|
self.assertEquals(channel.json_body["error"], "Invalid password")
|
|
|
|
def test_POST_bad_username(self):
|
|
request_data = json.dumps({"username": 777, "password": "monkey"})
|
|
request, channel = self.make_request(b"POST", self.url, request_data)
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"400", channel.result)
|
|
self.assertEquals(channel.json_body["error"], "Invalid username")
|
|
|
|
def test_POST_user_valid(self):
|
|
user_id = "@kermit:test"
|
|
device_id = "frogfone"
|
|
params = {
|
|
"username": "kermit",
|
|
"password": "monkey",
|
|
"device_id": device_id,
|
|
"auth": {"type": LoginType.DUMMY},
|
|
}
|
|
request_data = json.dumps(params)
|
|
request, channel = self.make_request(b"POST", self.url, request_data)
|
|
self.render(request)
|
|
|
|
det_data = {
|
|
"user_id": user_id,
|
|
"home_server": self.hs.hostname,
|
|
"device_id": device_id,
|
|
}
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
self.assertDictContainsSubset(det_data, channel.json_body)
|
|
|
|
def test_POST_disabled_registration(self):
|
|
self.hs.config.enable_registration = False
|
|
request_data = json.dumps({"username": "kermit", "password": "monkey"})
|
|
self.auth_result = (None, {"username": "kermit", "password": "monkey"}, None)
|
|
|
|
request, channel = self.make_request(b"POST", self.url, request_data)
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"403", channel.result)
|
|
self.assertEquals(channel.json_body["error"], "Registration has been disabled")
|
|
|
|
def test_POST_guest_registration(self):
|
|
self.hs.config.macaroon_secret_key = "test"
|
|
self.hs.config.allow_guest_access = True
|
|
|
|
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
|
|
self.render(request)
|
|
|
|
det_data = {
|
|
"home_server": self.hs.hostname,
|
|
"device_id": "guest_device",
|
|
}
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
self.assertDictContainsSubset(det_data, channel.json_body)
|
|
|
|
def test_POST_disabled_guest_registration(self):
|
|
self.hs.config.allow_guest_access = False
|
|
|
|
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"403", channel.result)
|
|
self.assertEquals(channel.json_body["error"], "Guest access is disabled")
|
|
|
|
def test_POST_ratelimiting_guest(self):
|
|
self.hs.config.rc_registration_request_burst_count = 5
|
|
|
|
for i in range(0, 6):
|
|
url = self.url + b"?kind=guest"
|
|
request, channel = self.make_request(b"POST", url, b"{}")
|
|
self.render(request)
|
|
|
|
if i == 5:
|
|
self.assertEquals(channel.result["code"], b"429", channel.result)
|
|
retry_after_ms = int(channel.json_body["retry_after_ms"])
|
|
else:
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
|
|
self.reactor.advance(retry_after_ms / 1000.)
|
|
|
|
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
|
|
def test_POST_ratelimiting(self):
|
|
self.hs.config.rc_registration_request_burst_count = 5
|
|
|
|
for i in range(0, 6):
|
|
params = {
|
|
"username": "kermit" + str(i),
|
|
"password": "monkey",
|
|
"device_id": "frogfone",
|
|
"auth": {"type": LoginType.DUMMY},
|
|
}
|
|
request_data = json.dumps(params)
|
|
request, channel = self.make_request(b"POST", self.url, request_data)
|
|
self.render(request)
|
|
|
|
if i == 5:
|
|
self.assertEquals(channel.result["code"], b"429", channel.result)
|
|
retry_after_ms = int(channel.json_body["retry_after_ms"])
|
|
else:
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|
|
|
|
self.reactor.advance(retry_after_ms / 1000.)
|
|
|
|
request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}")
|
|
self.render(request)
|
|
|
|
self.assertEquals(channel.result["code"], b"200", channel.result)
|