0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-17 18:53:57 +01:00
synapse/contrib/scripts/kick_users.py
cclauss 82d9d524bd Fix seven contrib files with Python syntax errors (#5446)
* Fix seven contrib files with Python syntax errors

Signed-off-by: cclauss <cclauss@me.com>
2019-06-18 03:21:30 +10:00

99 lines
3.3 KiB
Python
Executable file

#!/usr/bin/env python
from __future__ import print_function
from argparse import ArgumentParser
import json
import requests
import sys
import urllib
try:
raw_input
except NameError: # Python 3
raw_input = input
def _mkurl(template, kws):
for key in kws:
template = template.replace(key, kws[key])
return template
def main(hs, room_id, access_token, user_id_prefix, why):
if not why:
why = "Automated kick."
print("Kicking members on %s in room %s matching %s" % (hs, room_id, user_id_prefix))
room_state_url = _mkurl(
"$HS/_matrix/client/api/v1/rooms/$ROOM/state?access_token=$TOKEN",
{
"$HS": hs,
"$ROOM": room_id,
"$TOKEN": access_token
}
)
print("Getting room state => %s" % room_state_url)
res = requests.get(room_state_url)
print("HTTP %s" % res.status_code)
state_events = res.json()
if "error" in state_events:
print("FATAL")
print(state_events)
return
kick_list = []
room_name = room_id
for event in state_events:
if not event["type"] == "m.room.member":
if event["type"] == "m.room.name":
room_name = event["content"].get("name")
continue
if not event["content"].get("membership") == "join":
continue
if event["state_key"].startswith(user_id_prefix):
kick_list.append(event["state_key"])
if len(kick_list) == 0:
print("No user IDs match the prefix '%s'" % user_id_prefix)
return
print("The following user IDs will be kicked from %s" % room_name)
for uid in kick_list:
print(uid)
doit = raw_input("Continue? [Y]es\n")
if len(doit) > 0 and doit.lower() == 'y':
print("Kicking members...")
# encode them all
kick_list = [urllib.quote(uid) for uid in kick_list]
for uid in kick_list:
kick_url = _mkurl(
"$HS/_matrix/client/api/v1/rooms/$ROOM/state/m.room.member/$UID?access_token=$TOKEN",
{
"$HS": hs,
"$UID": uid,
"$ROOM": room_id,
"$TOKEN": access_token
}
)
kick_body = {
"membership": "leave",
"reason": why
}
print("Kicking %s" % uid)
res = requests.put(kick_url, data=json.dumps(kick_body))
if res.status_code != 200:
print("ERROR: HTTP %s" % res.status_code)
if res.json().get("error"):
print("ERROR: JSON %s" % res.json())
if __name__ == "__main__":
parser = ArgumentParser("Kick members in a room matching a certain user ID prefix.")
parser.add_argument("-u","--user-id",help="The user ID prefix e.g. '@irc_'")
parser.add_argument("-t","--token",help="Your access_token")
parser.add_argument("-r","--room",help="The room ID to kick members in")
parser.add_argument("-s","--homeserver",help="The base HS url e.g. http://matrix.org")
parser.add_argument("-w","--why",help="Reason for the kick. Optional.")
args = parser.parse_args()
if not args.room or not args.token or not args.user_id or not args.homeserver:
parser.print_help()
sys.exit(1)
else:
main(args.homeserver, args.room, args.token, args.user_id, args.why)