0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 03:33:48 +01:00

Merge branch 'develop' into notifier_performance

This commit is contained in:
Mark Haines 2015-05-18 13:50:01 +01:00
commit 0b0033c40b
10 changed files with 279 additions and 105 deletions

View file

@ -32,9 +32,9 @@ from synapse.server import HomeServer
from twisted.internet import reactor
from twisted.application import service
from twisted.enterprise import adbapi
from twisted.web.resource import Resource
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File
from twisted.web.server import Site
from twisted.web.server import Site, GzipEncoderFactory
from twisted.web.http import proxiedLogFormatter, combinedLogFormatter
from synapse.http.server import JsonResource, RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource
@ -69,16 +69,26 @@ import subprocess
logger = logging.getLogger("synapse.app.homeserver")
class GzipFile(File):
def getChild(self, path, request):
child = File.getChild(self, path, request)
return EncodingResourceWrapper(child, [GzipEncoderFactory()])
def gz_wrap(r):
return EncodingResourceWrapper(r, [GzipEncoderFactory()])
class SynapseHomeServer(HomeServer):
def build_http_client(self):
return MatrixFederationHttpClient(self)
def build_resource_for_client(self):
return ClientV1RestResource(self)
return gz_wrap(ClientV1RestResource(self))
def build_resource_for_client_v2_alpha(self):
return ClientV2AlphaRestResource(self)
return gz_wrap(ClientV2AlphaRestResource(self))
def build_resource_for_federation(self):
return JsonResource(self)
@ -87,9 +97,10 @@ class SynapseHomeServer(HomeServer):
import syweb
syweb_path = os.path.dirname(syweb.__file__)
webclient_path = os.path.join(syweb_path, "webclient")
return File(webclient_path) # TODO configurable?
return GzipFile(webclient_path) # TODO configurable?
def build_resource_for_static_content(self):
# This is old and should go away: not going to bother adding gzip
return File("static")
def build_resource_for_content_repo(self):

View file

@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes
from synapse.types import RoomAlias
import logging
import string
logger = logging.getLogger(__name__)
@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler):
def _create_association(self, room_alias, room_id, servers=None):
# general association creation for both human users and app services
for wchar in string.whitespace:
if wchar in room_alias.localpart:
raise SynapseError(400, "Invalid characters in room alias")
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
# TODO(erikj): Change this.

View file

@ -317,6 +317,14 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
"""Called via the distributor whenever a user joins a room.
Notifies the new member of the presence of the current members.
Notifies the current members of the room of the new member's presence.
Args:
user(UserID): The user who joined the room.
room_id(str): The room id the user joined.
"""
if self.hs.is_mine(user):
statuscache = self._get_or_make_usercache(user)
@ -346,6 +354,7 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def send_invite(self, observer_user, observed_user):
"""Request the presence of a local or remote user for a local user"""
if not self.hs.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
@ -380,6 +389,15 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def invite_presence(self, observed_user, observer_user):
"""Handles a m.presence_invite EDU. A remote or local user has
requested presence updates for a local user. If the invite is accepted
then allow the local or remote user to see the presence of the local
user.
Args:
observed_user(UserID): The local user whose presence is requested.
observer_user(UserID): The remote or local user requesting presence.
"""
accept = yield self._should_accept_invite(observed_user, observer_user)
if accept:
@ -406,6 +424,14 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def accept_presence(self, observed_user, observer_user):
"""Handles a m.presence_accept EDU. Mark a presence invite from a
local or remote user as accepted in a local user's presence list.
Starts polling for presence updates from the local or remote user.
Args:
observed_user(UserID): The user to update in the presence list.
observer_user(UserID): The owner of the presence list to update.
"""
yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string()
)
@ -416,6 +442,16 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user):
"""Handle a m.presence_deny EDU. Removes a local or remote user from a
local user's presence list.
Args:
observed_user(UserID): The local or remote user to remove from the
list.
observer_user(UserID): The local owner of the presence list.
Returns:
A Deferred.
"""
yield self.store.del_presence_list(
observer_user.localpart, observed_user.to_string()
)
@ -424,6 +460,16 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def drop(self, observed_user, observer_user):
"""Remove a local or remote user from a local user's presence list and
unsubscribe the local user from updates that user.
Args:
observed_user(UserId): The local or remote user to remove from the
list.
observer_user(UserId): The local owner of the presence list.
Returns:
A Deferred.
"""
if not self.hs.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
@ -437,6 +483,16 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
"""Get the presence list for a local user. The retured list includes
the current presence state for each user listed.
Args:
observer_user(UserID): The local user whose presence list to fetch.
accepted(bool or None): If not none then only include users who
have or have not accepted the presence invite request.
Returns:
A Deferred list of presence state events.
"""
if not self.hs.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
@ -458,6 +514,23 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def start_polling_presence(self, user, target_user=None, state=None):
"""Subscribe a local user to presence updates from a local or remote
user. If no target_user is supplied then subscribe to all users stored
in the presence list for the local user.
Additonally this pushes the current presence state of this user to all
target_users. That state can be provided directly or will be read from
the stored state for the local user.
Also this attempts to notify the local user of the current state of
any local target users.
Args:
user(UserID): The local user that whishes for presence updates.
target_user(UserID): The local or remote user whose updates are
wanted.
state(dict): Optional presence state for the local user.
"""
logger.debug("Start polling for presence from %s", user)
if target_user:
@ -498,9 +571,7 @@ class PresenceHandler(BaseHandler):
# We want to tell the person that just came online
# presence state of people they are interested in?
self.push_update_to_clients(
observed_user=target_user,
users_to_push=[user],
statuscache=self._get_or_offline_usercache(target_user),
)
deferreds = []
@ -517,6 +588,12 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds, consumeErrors=True)
def _start_polling_local(self, user, target_user):
"""Subscribe a local user to presence updates for a local user
Args:
user(UserId): The local user that wishes for updates.
target_user(UserId): The local users whose updates are wanted.
"""
target_localpart = target_user.localpart
if target_localpart not in self._local_pushmap:
@ -525,6 +602,17 @@ class PresenceHandler(BaseHandler):
self._local_pushmap[target_localpart].add(user)
def _start_polling_remote(self, user, domain, remoteusers):
"""Subscribe a local user to presence updates for remote users on a
given remote domain.
Args:
user(UserID): The local user that wishes for updates.
domain(str): The remote server the local user wants updates from.
remoteusers(UserID): The remote users that local user wants to be
told about.
Returns:
A Deferred.
"""
to_poll = set()
for u in remoteusers:
@ -545,6 +633,17 @@ class PresenceHandler(BaseHandler):
@log_function
def stop_polling_presence(self, user, target_user=None):
"""Unsubscribe a local user from presence updates from a local or
remote user. If no target user is supplied then unsubscribe the user
from all presence updates that the user had subscribed to.
Args:
user(UserID): The local user that no longer wishes for updates.
target_user(UserID or None): The user whose updates are no longer
wanted.
Returns:
A Deferred.
"""
logger.debug("Stop polling for presence from %s", user)
if not target_user or self.hs.is_mine(target_user):
@ -573,6 +672,13 @@ class PresenceHandler(BaseHandler):
return defer.DeferredList(deferreds, consumeErrors=True)
def _stop_polling_local(self, user, target_user):
"""Unsubscribe a local user from presence updates from a local user on
this server.
Args:
user(UserID): The local user that no longer wishes for updates.
target_user(UserID): The user whose updates are no longer wanted.
"""
for localpart in self._local_pushmap.keys():
if target_user and localpart != target_user.localpart:
continue
@ -585,6 +691,17 @@ class PresenceHandler(BaseHandler):
@log_function
def _stop_polling_remote(self, user, domain, remoteusers):
"""Unsubscribe a local user from presence updates from remote users on
a given domain.
Args:
user(UserID): The local user that no longer wishes for updates.
domain(str): The remote server to unsubscribe from.
remoteusers([UserID]): The users on that remote server that the
local user no longer wishes to be updated about.
Returns:
A Deferred.
"""
to_unpoll = set()
for u in remoteusers:
@ -606,6 +723,19 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def push_presence(self, user, statuscache):
"""
Notify local and remote users of a change in presence of a local user.
Pushes the update to local clients and remote domains that are directly
subscribed to the presence of the local user.
Also pushes that update to any local user or remote domain that shares
a room with the local user.
Args:
user(UserID): The local user whose presence was updated.
statuscache(UserPresenceCache): Cache of the user's presence state
Returns:
A Deferred.
"""
assert(self.hs.is_mine(user))
logger.debug("Pushing presence update from %s", user)
@ -632,45 +762,24 @@ class PresenceHandler(BaseHandler):
)
yield self.distributor.fire("user_presence_changed", user, statuscache)
@defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None):
if state is None:
state = yield self.store.get_presence_state(user.localpart)
del state["mtime"]
state["presence"] = state.pop("state")
if user in self._user_cachemap:
state["last_active"] = (
self._user_cachemap[user].get_state()["last_active"]
)
yield self.distributor.fire(
"collect_presencelike_data", user, state
)
if "last_active" in state:
state = dict(state)
state["last_active_ago"] = int(
self.clock.time_msec() - state.pop("last_active")
)
user_state = {
"user_id": user.to_string(),
}
user_state.update(**state)
yield self.federation.send_edu(
destination=destination,
edu_type="m.presence",
content={
"push": [
user_state,
],
}
)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
"""Handle an incoming m.presence EDU.
For each presence update in the "push" list update our local cache and
notify the appropriate local clients. Only clients that share a room
or are directly subscribed to the presence for a user should be
notified of the update.
For each subscription request in the "poll" list start pushing presence
updates to the remote server.
For unsubscribe request in the "unpoll" list stop pushing presence
updates to the remote server.
Args:
orgin(str): The source of this m.presence EDU.
content(dict): The content of this m.presence EDU.
Returns:
A Deferred.
"""
deferreds = []
for push in content.get("push", []):
@ -714,10 +823,7 @@ class PresenceHandler(BaseHandler):
continue
self.push_update_to_clients(
observed_user=user,
users_to_push=observers,
room_ids=room_ids,
statuscache=statuscache,
users_to_push=observers, room_ids=room_ids
)
user_id = user.to_string()
@ -772,6 +878,23 @@ class PresenceHandler(BaseHandler):
def push_update_to_local_and_remote(self, observed_user, statuscache,
users_to_push=[], room_ids=[],
remote_domains=[]):
"""Notify local clients and remote servers of a change in the presence
of a user.
Args:
observed_user(UserID): The user to push the presence state for.
statuscache(UserPresenceCache): The cache for the presence state to
push.
users_to_push([UserID]): A list of local and remote users to
notify.
room_ids([str]): Notify the local and remote occupants of these
rooms.
remote_domains([str]): A list of remote servers to notify in
addition to those implied by the users_to_push and the
room_ids.
Returns:
A Deferred.
"""
localusers, remoteusers = partitionbool(
users_to_push,
@ -781,10 +904,7 @@ class PresenceHandler(BaseHandler):
localusers = set(localusers)
self.push_update_to_clients(
observed_user=observed_user,
users_to_push=localusers,
room_ids=room_ids,
statuscache=statuscache,
users_to_push=localusers, room_ids=room_ids
)
remote_domains = set(remote_domains)
@ -809,8 +929,13 @@ class PresenceHandler(BaseHandler):
defer.returnValue((localusers, remote_domains))
def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None):
def push_update_to_clients(self, users_to_push=[], room_ids=[]):
"""Notify clients of a new presence event.
Args:
users_to_push([UserID]): List of users to notify.
room_ids([str]): List of room_ids to notify.
"""
with PreserveLoggingContext():
self.notifier.on_new_user_event(
"presence_key",
@ -819,6 +944,52 @@ class PresenceHandler(BaseHandler):
room_ids,
)
@defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None):
"""Push a user's presence to a remote server. If a presence state event
that event is sent. Otherwise a new state event is constructed from the
stored presence state.
The last_active is replaced with last_active_ago in case the wallclock
time on the remote server is different to the time on this server.
Sends an EDU to the remote server with the current presence state.
Args:
user(UserID): The user to push the presence state for.
destination(str): The remote server to send state to.
state(dict): The state to push, or None to use the current stored
state.
Returns:
A Deferred.
"""
if state is None:
state = yield self.store.get_presence_state(user.localpart)
del state["mtime"]
state["presence"] = state.pop("state")
if user in self._user_cachemap:
state["last_active"] = (
self._user_cachemap[user].get_state()["last_active"]
)
yield self.distributor.fire(
"collect_presencelike_data", user, state
)
if "last_active" in state:
state = dict(state)
state["last_active_ago"] = int(
self.clock.time_msec() - state.pop("last_active")
)
user_state = {"user_id": user.to_string(), }
user_state.update(state)
yield self.federation.send_edu(
destination=destination,
edu_type="m.presence",
content={"push": [user_state, ], }
)
class PresenceEventSource(object):
def __init__(self, hs):

View file

@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler):
if target_user != auth_user:
raise AuthError(400, "Cannot set another user's displayname")
if new_displayname == '':
new_displayname = None
yield self.store.set_profile_displayname(
target_user.localpart, new_displayname
)

View file

@ -26,6 +26,7 @@ from synapse.util.async import run_on_reactor
from synapse.events.utils import serialize_event
import logging
import string
logger = logging.getLogger(__name__)
@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler):
self.ratelimit(user_id)
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
raise SynapseError(400, "Invalid characters in room alias")
room_alias = RoomAlias.create(
config["room_alias_name"],
self.hs.hostname,

View file

@ -82,8 +82,10 @@ class RegisterRestServlet(RestServlet):
[LoginType.EMAIL_IDENTITY]
]
result = None
if service:
is_application_server = True
params = body
elif 'mac' in body:
# Check registration-specific shared secret auth
if 'username' not in body:
@ -92,6 +94,7 @@ class RegisterRestServlet(RestServlet):
body['username'], body['mac']
)
is_using_shared_secret = True
params = body
else:
authed, result, params = yield self.auth_handler.check_auth(
flows, body, self.hs.get_ip_from_request(request)
@ -118,7 +121,7 @@ class RegisterRestServlet(RestServlet):
password=new_password
)
if LoginType.EMAIL_IDENTITY in result:
if result and LoginType.EMAIL_IDENTITY in result:
threepid = result[LoginType.EMAIL_IDENTITY]
for reqd in ['medium', 'address', 'validated_at']:

View file

@ -309,6 +309,7 @@ class SQLBaseStore(object):
self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
self._pushers_id_gen = IdGenerator("pushers", "id", self)
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()

View file

@ -204,11 +204,21 @@ class PushRuleStore(SQLBaseStore):
@defer.inlineCallbacks
def set_push_rule_enabled(self, user_name, rule_id, enabled):
yield self._simple_upsert(
ret = yield self.runInteraction(
"_set_push_rule_enabled_txn",
self._set_push_rule_enabled_txn,
user_name, rule_id, enabled
)
defer.returnValue(ret)
def _set_push_rule_enabled_txn(self, txn, user_name, rule_id, enabled):
new_id = self._push_rules_enable_id_gen.get_next_txn(txn)
self._simple_upsert_txn(
txn,
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
{'enabled': 1 if enabled else 0},
desc="set_push_rule_enabled",
{'id': new_id},
)

View file

@ -1097,12 +1097,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase):
# apple should see both banana and clementine currently offline
self.mock_update_client.assert_has_calls([
call(users_to_push=[self.u_apple],
observed_user=self.u_banana,
statuscache=ANY),
call(users_to_push=[self.u_apple],
observed_user=self.u_clementine,
statuscache=ANY),
call(users_to_push=[self.u_apple]),
call(users_to_push=[self.u_apple]),
], any_order=True)
# Gut-wrenching tests
@ -1121,13 +1117,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase):
# apple and banana should now both see each other online
self.mock_update_client.assert_has_calls([
call(users_to_push=set([self.u_apple]),
observed_user=self.u_banana,
room_ids=[],
statuscache=ANY),
call(users_to_push=[self.u_banana],
observed_user=self.u_apple,
statuscache=ANY),
call(users_to_push=set([self.u_apple]), room_ids=[]),
call(users_to_push=[self.u_banana]),
], any_order=True)
self.assertTrue("apple" in self.handler._local_pushmap)
@ -1143,10 +1134,7 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase):
# banana should now be told apple is offline
self.mock_update_client.assert_has_calls([
call(users_to_push=set([self.u_banana, self.u_apple]),
observed_user=self.u_apple,
room_ids=[],
statuscache=ANY),
call(users_to_push=set([self.u_banana, self.u_apple]), room_ids=[]),
], any_order=True)
self.assertFalse("banana" in self.handler._local_pushmap)

View file

@ -209,20 +209,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
], presence)
self.mock_update_client.assert_has_calls([
call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=[],
observed_user=self.u_apple,
statuscache=ANY), # self-reflection
call(
users_to_push={self.u_apple, self.u_banana, self.u_clementine},
room_ids=[]
),
], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"]
self.assertEquals({
"presence": ONLINE,
"last_active": 1000000, # MockClock
"displayname": "Frank",
"avatar_url": "http://foo",
}, statuscache.state)
self.mock_update_client.reset_mock()
self.datastore.set_profile_displayname.return_value = defer.succeed(
@ -232,21 +224,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.u_apple, "I am an Apple")
self.mock_update_client.assert_has_calls([
call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
call(
users_to_push={self.u_apple, self.u_banana, self.u_clementine},
room_ids=[],
observed_user=self.u_apple,
statuscache=ANY), # self-reflection
),
], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"]
self.assertEquals({
"presence": ONLINE,
"last_active": 1000000, # MockClock
"displayname": "I am an Apple",
"avatar_url": "http://foo",
}, statuscache.state)
@defer.inlineCallbacks
def test_push_remote(self):
self.presence_list = [
@ -314,13 +297,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.mock_update_client.assert_called_with(
users_to_push=set([self.u_apple]),
room_ids=[],
observed_user=self.u_potato,
statuscache=ANY)
statuscache = self.mock_update_client.call_args[1]["statuscache"]
self.assertEquals({"presence": ONLINE,
"displayname": "Frank",
"avatar_url": "http://foo"}, statuscache.state)
)
state = yield self.handlers.presence_handler.get_state(self.u_potato,
self.u_apple)