Simplify the user admin API tests (#11048)

This commit is contained in:
Dirk Klimpel 2021-10-12 21:38:48 +02:00 committed by GitHub
parent 5dcacdf6d1
commit 988de0afb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 147 additions and 255 deletions

1
changelog.d/11048.misc Normal file
View file

@ -0,0 +1 @@
Simplify the user admin API tests.

View file

@ -14,14 +14,13 @@
import hashlib import hashlib
import hmac import hmac
import json
import os import os
import urllib.parse import urllib.parse
from binascii import unhexlify from binascii import unhexlify
from typing import List, Optional from typing import List, Optional
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
from parameterized import parameterized from parameterized import parameterized, parameterized_class
import synapse.rest.admin import synapse.rest.admin
from synapse.api.constants import UserTypes from synapse.api.constants import UserTypes
@ -104,8 +103,8 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# 59 seconds # 59 seconds
self.reactor.advance(59) self.reactor.advance(59)
body = json.dumps({"nonce": nonce}) body = {"nonce": nonce}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("username must be specified", channel.json_body["error"]) self.assertEqual("username must be specified", channel.json_body["error"])
@ -113,7 +112,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# 61 seconds # 61 seconds
self.reactor.advance(2) self.reactor.advance(2)
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"]) self.assertEqual("unrecognised nonce", channel.json_body["error"])
@ -129,18 +128,16 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin") want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob", "username": "bob",
"password": "abc123", "password": "abc123",
"admin": True, "admin": True,
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual("HMAC incorrect", channel.json_body["error"]) self.assertEqual("HMAC incorrect", channel.json_body["error"])
def test_register_correct_nonce(self): def test_register_correct_nonce(self):
@ -157,8 +154,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
) )
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob", "username": "bob",
"password": "abc123", "password": "abc123",
@ -166,8 +162,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"user_type": UserTypes.SUPPORT, "user_type": UserTypes.SUPPORT,
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"]) self.assertEqual("@bob:test", channel.json_body["user_id"])
@ -183,22 +178,20 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin") want_mac.update(nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob", "username": "bob",
"password": "abc123", "password": "abc123",
"admin": True, "admin": True,
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"]) self.assertEqual("@bob:test", channel.json_body["user_id"])
# Now, try and reuse it # Now, try and reuse it
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"]) self.assertEqual("unrecognised nonce", channel.json_body["error"])
@ -218,9 +211,8 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# Nonce check # Nonce check
# #
# Must be present # Must be an empty body present
body = json.dumps({}) channel = self.make_request("POST", self.url, {})
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("nonce must be specified", channel.json_body["error"]) self.assertEqual("nonce must be specified", channel.json_body["error"])
@ -230,29 +222,28 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# #
# Must be present # Must be present
body = json.dumps({"nonce": nonce()}) channel = self.make_request("POST", self.url, {"nonce": nonce()})
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("username must be specified", channel.json_body["error"]) self.assertEqual("username must be specified", channel.json_body["error"])
# Must be a string # Must be a string
body = json.dumps({"nonce": nonce(), "username": 1234}) body = {"nonce": nonce(), "username": 1234}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"]) self.assertEqual("Invalid username", channel.json_body["error"])
# Must not have null bytes # Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "abcd\u0000"}) body = {"nonce": nonce(), "username": "abcd\u0000"}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"]) self.assertEqual("Invalid username", channel.json_body["error"])
# Must not have null bytes # Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "a" * 1000}) body = {"nonce": nonce(), "username": "a" * 1000}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"]) self.assertEqual("Invalid username", channel.json_body["error"])
@ -262,29 +253,29 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# #
# Must be present # Must be present
body = json.dumps({"nonce": nonce(), "username": "a"}) body = {"nonce": nonce(), "username": "a"}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("password must be specified", channel.json_body["error"]) self.assertEqual("password must be specified", channel.json_body["error"])
# Must be a string # Must be a string
body = json.dumps({"nonce": nonce(), "username": "a", "password": 1234}) body = {"nonce": nonce(), "username": "a", "password": 1234}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"]) self.assertEqual("Invalid password", channel.json_body["error"])
# Must not have null bytes # Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "a", "password": "abcd\u0000"}) body = {"nonce": nonce(), "username": "a", "password": "abcd\u0000"}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"]) self.assertEqual("Invalid password", channel.json_body["error"])
# Super long # Super long
body = json.dumps({"nonce": nonce(), "username": "a", "password": "A" * 1000}) body = {"nonce": nonce(), "username": "a", "password": "A" * 1000}
channel = self.make_request("POST", self.url, body.encode("utf8")) channel = self.make_request("POST", self.url, body)
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"]) self.assertEqual("Invalid password", channel.json_body["error"])
@ -294,15 +285,13 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# #
# Invalid user_type # Invalid user_type
body = json.dumps( body = {
{
"nonce": nonce(), "nonce": nonce(),
"username": "a", "username": "a",
"password": "1234", "password": "1234",
"user_type": "invalid", "user_type": "invalid",
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Invalid user type", channel.json_body["error"]) self.assertEqual("Invalid user type", channel.json_body["error"])
@ -320,10 +309,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(nonce.encode("ascii") + b"\x00bob1\x00abc123\x00notadmin") want_mac.update(nonce.encode("ascii") + b"\x00bob1\x00abc123\x00notadmin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{"nonce": nonce, "username": "bob1", "password": "abc123", "mac": want_mac} "nonce": nonce,
) "username": "bob1",
channel = self.make_request("POST", self.url, body.encode("utf8")) "password": "abc123",
"mac": want_mac,
}
channel = self.make_request("POST", self.url, body)
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob1:test", channel.json_body["user_id"]) self.assertEqual("@bob1:test", channel.json_body["user_id"])
@ -340,16 +333,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(nonce.encode("ascii") + b"\x00bob2\x00abc123\x00notadmin") want_mac.update(nonce.encode("ascii") + b"\x00bob2\x00abc123\x00notadmin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob2", "username": "bob2",
"displayname": None, "displayname": None,
"password": "abc123", "password": "abc123",
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob2:test", channel.json_body["user_id"]) self.assertEqual("@bob2:test", channel.json_body["user_id"])
@ -366,22 +357,20 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(nonce.encode("ascii") + b"\x00bob3\x00abc123\x00notadmin") want_mac.update(nonce.encode("ascii") + b"\x00bob3\x00abc123\x00notadmin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob3", "username": "bob3",
"displayname": "", "displayname": "",
"password": "abc123", "password": "abc123",
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob3:test", channel.json_body["user_id"]) self.assertEqual("@bob3:test", channel.json_body["user_id"])
channel = self.make_request("GET", "/profile/@bob3:test/displayname") channel = self.make_request("GET", "/profile/@bob3:test/displayname")
self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(404, channel.code, msg=channel.json_body)
# set displayname # set displayname
channel = self.make_request("GET", self.url) channel = self.make_request("GET", self.url)
@ -391,16 +380,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(nonce.encode("ascii") + b"\x00bob4\x00abc123\x00notadmin") want_mac.update(nonce.encode("ascii") + b"\x00bob4\x00abc123\x00notadmin")
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob4", "username": "bob4",
"displayname": "Bob's Name", "displayname": "Bob's Name",
"password": "abc123", "password": "abc123",
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob4:test", channel.json_body["user_id"]) self.assertEqual("@bob4:test", channel.json_body["user_id"])
@ -440,8 +427,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
) )
want_mac = want_mac.hexdigest() want_mac = want_mac.hexdigest()
body = json.dumps( body = {
{
"nonce": nonce, "nonce": nonce,
"username": "bob", "username": "bob",
"password": "abc123", "password": "abc123",
@ -449,8 +435,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"user_type": UserTypes.SUPPORT, "user_type": UserTypes.SUPPORT,
"mac": want_mac, "mac": want_mac,
} }
) channel = self.make_request("POST", self.url, body)
channel = self.make_request("POST", self.url, body.encode("utf8"))
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"]) self.assertEqual("@bob:test", channel.json_body["user_id"])
@ -993,12 +978,11 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
""" """
If parameter `erase` is not boolean, return an error If parameter `erase` is not boolean, return an error
""" """
body = json.dumps({"erase": "False"})
channel = self.make_request( channel = self.make_request(
"POST", "POST",
self.url, self.url,
content=body.encode(encoding="utf_8"), content={"erase": "False"},
access_token=self.admin_user_tok, access_token=self.admin_user_tok,
) )
@ -2201,7 +2185,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
""" """
channel = self.make_request("GET", self.url, b"{}") channel = self.make_request("GET", self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self): def test_requester_is_no_admin(self):
@ -2216,7 +2200,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token, access_token=other_user_token,
) )
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self): def test_user_does_not_exist(self):
@ -2359,7 +2343,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
""" """
channel = self.make_request("GET", self.url, b"{}") channel = self.make_request("GET", self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self): def test_requester_is_no_admin(self):
@ -2374,7 +2358,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token, access_token=other_user_token,
) )
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self): def test_user_does_not_exist(self):
@ -3073,7 +3057,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
"""Try to login as a user without authentication.""" """Try to login as a user without authentication."""
channel = self.make_request("POST", self.url, b"{}") channel = self.make_request("POST", self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_not_admin(self): def test_not_admin(self):
@ -3082,7 +3066,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
"POST", self.url, b"{}", access_token=self.other_user_tok "POST", self.url, b"{}", access_token=self.other_user_tok
) )
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
def test_send_event(self): def test_send_event(self):
"""Test that sending event as a user works.""" """Test that sending event as a user works."""
@ -3127,7 +3111,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# The puppet token should no longer work # The puppet token should no longer work
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token) channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
# .. but the real user's tokens should still work # .. but the real user's tokens should still work
channel = self.make_request( channel = self.make_request(
@ -3160,7 +3144,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "devices", b"{}", access_token=self.other_user_tok "GET", "devices", b"{}", access_token=self.other_user_tok
) )
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
def test_admin_logout_all(self): def test_admin_logout_all(self):
"""Tests that the admin user calling `/logout/all` does expire the """Tests that the admin user calling `/logout/all` does expire the
@ -3181,7 +3165,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# The puppet token should no longer work # The puppet token should no longer work
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token) channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
# .. but the real user's tokens should still work # .. but the real user's tokens should still work
channel = self.make_request( channel = self.make_request(
@ -3242,6 +3226,13 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
self.helper.join(room_id, user=self.other_user, tok=puppet_token) self.helper.join(room_id, user=self.other_user, tok=puppet_token)
@parameterized_class(
("url_prefix",),
[
("/_synapse/admin/v1/whois/%s",),
("/_matrix/client/r0/admin/whois/%s",),
],
)
class WhoisRestTestCase(unittest.HomeserverTestCase): class WhoisRestTestCase(unittest.HomeserverTestCase):
servlets = [ servlets = [
@ -3254,21 +3245,14 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.admin_user_tok = self.login("admin", "pass") self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass") self.other_user = self.register_user("user", "pass")
self.url1 = "/_synapse/admin/v1/whois/%s" % urllib.parse.quote(self.other_user) self.url = self.url_prefix % self.other_user
self.url2 = "/_matrix/client/r0/admin/whois/%s" % urllib.parse.quote(
self.other_user
)
def test_no_auth(self): def test_no_auth(self):
""" """
Try to get information of an user without authentication. Try to get information of an user without authentication.
""" """
channel = self.make_request("GET", self.url1, b"{}") channel = self.make_request("GET", self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
channel = self.make_request("GET", self.url2, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_not_admin(self): def test_requester_is_not_admin(self):
@ -3280,38 +3264,21 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "GET",
self.url1, self.url,
access_token=other_user2_token, access_token=other_user2_token,
) )
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
channel = self.make_request(
"GET",
self.url2,
access_token=other_user2_token,
)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_is_not_local(self): def test_user_is_not_local(self):
""" """
Tests that a lookup for a user that is not a local returns a 400 Tests that a lookup for a user that is not a local returns a 400
""" """
url1 = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain" url = self.url_prefix % "@unknown_person:unknown_domain"
url2 = "/_matrix/client/r0/admin/whois/@unknown_person:unknown_domain"
channel = self.make_request( channel = self.make_request(
"GET", "GET",
url1, url,
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only whois a local user", channel.json_body["error"])
channel = self.make_request(
"GET",
url2,
access_token=self.admin_user_tok, access_token=self.admin_user_tok,
) )
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
@ -3323,16 +3290,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
""" """
channel = self.make_request( channel = self.make_request(
"GET", "GET",
self.url1, self.url,
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
channel = self.make_request(
"GET",
self.url2,
access_token=self.admin_user_tok, access_token=self.admin_user_tok,
) )
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
@ -3347,16 +3305,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request( channel = self.make_request(
"GET", "GET",
self.url1, self.url,
access_token=other_user_token,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
channel = self.make_request(
"GET",
self.url2,
access_token=other_user_token, access_token=other_user_token,
) )
self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(200, channel.code, msg=channel.json_body)
@ -3388,7 +3337,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
Try to get information of an user without authentication. Try to get information of an user without authentication.
""" """
channel = self.make_request("POST", self.url) channel = self.make_request("POST", self.url)
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_not_admin(self): def test_requester_is_not_admin(self):
@ -3398,7 +3347,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
other_user_token = self.login("user", "pass") other_user_token = self.login("user", "pass")
channel = self.make_request("POST", self.url, access_token=other_user_token) channel = self.make_request("POST", self.url, access_token=other_user_token)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_is_not_local(self): def test_user_is_not_local(self):
@ -3447,66 +3396,41 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
% urllib.parse.quote(self.other_user) % urllib.parse.quote(self.other_user)
) )
def test_no_auth(self): @parameterized.expand(["GET", "POST", "DELETE"])
def test_no_auth(self, method: str):
""" """
Try to get information of a user without authentication. Try to get information of a user without authentication.
""" """
channel = self.make_request("GET", self.url, b"{}") channel = self.make_request(method, self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
channel = self.make_request("POST", self.url, b"{}") @parameterized.expand(["GET", "POST", "DELETE"])
def test_requester_is_no_admin(self, method: str):
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
channel = self.make_request("DELETE", self.url, b"{}")
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self):
""" """
If the user is not a server admin, an error is returned. If the user is not a server admin, an error is returned.
""" """
other_user_token = self.login("user", "pass") other_user_token = self.login("user", "pass")
channel = self.make_request( channel = self.make_request(
"GET", method,
self.url, self.url,
access_token=other_user_token, access_token=other_user_token,
) )
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
channel = self.make_request( @parameterized.expand(["GET", "POST", "DELETE"])
"POST", def test_user_does_not_exist(self, method: str):
self.url,
access_token=other_user_token,
)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
channel = self.make_request(
"DELETE",
self.url,
access_token=other_user_token,
)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self):
""" """
Tests that a lookup for a user that does not exist returns a 404 Tests that a lookup for a user that does not exist returns a 404
""" """
url = "/_synapse/admin/v1/users/@unknown_person:test/override_ratelimit" url = "/_synapse/admin/v1/users/@unknown_person:test/override_ratelimit"
channel = self.make_request( channel = self.make_request(
"GET", method,
url, url,
access_token=self.admin_user_tok, access_token=self.admin_user_tok,
) )
@ -3514,25 +3438,14 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(404, channel.code, msg=channel.json_body) self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
channel = self.make_request( @parameterized.expand(
"POST", [
url, ("GET", "Can only look up local users"),
access_token=self.admin_user_tok, ("POST", "Only local users can be ratelimited"),
("DELETE", "Only local users can be ratelimited"),
]
) )
def test_user_is_not_local(self, method: str, error_msg: str):
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
channel = self.make_request(
"DELETE",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_user_is_not_local(self):
""" """
Tests that a lookup for a user that is not a local returns a 400 Tests that a lookup for a user that is not a local returns a 400
""" """
@ -3541,35 +3454,13 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
) )
channel = self.make_request( channel = self.make_request(
"GET", method,
url, url,
access_token=self.admin_user_tok, access_token=self.admin_user_tok,
) )
self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"]) self.assertEqual(error_msg, channel.json_body["error"])
channel = self.make_request(
"POST",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(
"Only local users can be ratelimited", channel.json_body["error"]
)
channel = self.make_request(
"DELETE",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(
"Only local users can be ratelimited", channel.json_body["error"]
)
def test_invalid_parameter(self): def test_invalid_parameter(self):
""" """