forked from MirrorHub/synapse
Glue AS work to general event notifications. Add more exception handling when poking ASes.
This commit is contained in:
parent
bc658907f0
commit
51d63ac329
4 changed files with 42 additions and 16 deletions
|
@ -62,7 +62,7 @@ class ApplicationService(object):
|
||||||
|
|
||||||
def _matches_regex(self, test_string, namespace_key):
|
def _matches_regex(self, test_string, namespace_key):
|
||||||
if not isinstance(test_string, basestring):
|
if not isinstance(test_string, basestring):
|
||||||
logger.warning(
|
logger.error(
|
||||||
"Expected a string to test regex against, but got %s",
|
"Expected a string to test regex against, but got %s",
|
||||||
test_string
|
test_string
|
||||||
)
|
)
|
||||||
|
|
|
@ -46,6 +46,9 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
return
|
return
|
||||||
logger.warning("query_user to %s received %s", uri, e.code)
|
logger.warning("query_user to %s received %s", uri, e.code)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.warning("query_user to %s threw exception %s", uri, ex)
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def query_alias(self, service, alias):
|
def query_alias(self, service, alias):
|
||||||
|
@ -62,6 +65,10 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
return
|
return
|
||||||
logger.warning("query_alias to %s received %s", uri, e.code)
|
logger.warning("query_alias to %s received %s", uri, e.code)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.warning("query_alias to %s threw exception %s", uri, ex)
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def push_bulk(self, service, events):
|
def push_bulk(self, service, events):
|
||||||
|
@ -82,7 +89,9 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
logger.warning("push_bulk to %s received %s", uri, e.code)
|
logger.warning("push_bulk to %s received %s", uri, e.code)
|
||||||
defer.returnValue(False)
|
except Exception as ex:
|
||||||
|
logger.warning("push_bulk to %s threw exception %s", uri, ex)
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def push(self, service, event):
|
def push(self, service, event):
|
||||||
|
|
|
@ -23,7 +23,6 @@ import synapse.util.stringutils as stringutils
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,6 +57,7 @@ class ApplicationServicesHandler(object):
|
||||||
yield self.store.update_app_service(app_service)
|
yield self.store.update_app_service(app_service)
|
||||||
defer.returnValue(app_service)
|
defer.returnValue(app_service)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def unregister(self, token):
|
def unregister(self, token):
|
||||||
logger.info("Unregister as_token=%s", token)
|
logger.info("Unregister as_token=%s", token)
|
||||||
yield self.store.unregister_app_service(token)
|
yield self.store.unregister_app_service(token)
|
||||||
|
@ -81,34 +81,45 @@ class ApplicationServicesHandler(object):
|
||||||
# all services which match that user regex.
|
# all services which match that user regex.
|
||||||
unknown_user = yield self._is_unknown_user(event.sender)
|
unknown_user = yield self._is_unknown_user(event.sender)
|
||||||
if unknown_user:
|
if unknown_user:
|
||||||
user_query_services = yield self._get_services_for_event(
|
yield self.query_user_exists(event)
|
||||||
event=event,
|
|
||||||
restrict_to=ApplicationService.NS_USERS
|
|
||||||
)
|
|
||||||
for user_service in user_query_services:
|
|
||||||
# this needs to block XXX: Need to feed response back to caller
|
|
||||||
is_known_user = yield self.appservice_api.query_user(
|
|
||||||
user_service, event.sender
|
|
||||||
)
|
|
||||||
if is_known_user:
|
|
||||||
# the user exists now,so don't query more ASes.
|
|
||||||
break
|
|
||||||
|
|
||||||
# Fork off pushes to these services - XXX First cut, best effort
|
# Fork off pushes to these services - XXX First cut, best effort
|
||||||
for service in services:
|
for service in services:
|
||||||
self.appservice_api.push(service, event)
|
self.appservice_api.push(service, event)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def query_user_exists(self, event):
|
||||||
|
"""Check if an application services knows this event.sender exists.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event: An event sent by the user to query
|
||||||
|
Returns:
|
||||||
|
True if this user exists.
|
||||||
|
"""
|
||||||
|
# TODO Would be nice for this to accept a user ID instead of an event.
|
||||||
|
user_query_services = yield self._get_services_for_event(
|
||||||
|
event=event,
|
||||||
|
restrict_to=ApplicationService.NS_USERS
|
||||||
|
)
|
||||||
|
for user_service in user_query_services:
|
||||||
|
is_known_user = yield self.appservice_api.query_user(
|
||||||
|
user_service, event.sender
|
||||||
|
)
|
||||||
|
if is_known_user:
|
||||||
|
defer.returnValue(True)
|
||||||
|
defer.returnValue(False)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def query_room_alias_exists(self, room_alias):
|
def query_room_alias_exists(self, room_alias):
|
||||||
"""Check if an application service knows this room alias exists.
|
"""Check if an application service knows this room alias exists.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_alias(str): The room alias to query.
|
room_alias(RoomAlias): The room alias to query.
|
||||||
Returns:
|
Returns:
|
||||||
namedtuple: with keys "room_id" and "servers" or None if no
|
namedtuple: with keys "room_id" and "servers" or None if no
|
||||||
association can be found.
|
association can be found.
|
||||||
"""
|
"""
|
||||||
|
room_alias = room_alias.to_string()
|
||||||
alias_query_services = yield self._get_services_for_event(
|
alias_query_services = yield self._get_services_for_event(
|
||||||
event=None,
|
event=None,
|
||||||
restrict_to=ApplicationService.NS_ALIASES,
|
restrict_to=ApplicationService.NS_ALIASES,
|
||||||
|
|
|
@ -99,6 +99,12 @@ class Notifier(object):
|
||||||
`extra_users` param.
|
`extra_users` param.
|
||||||
"""
|
"""
|
||||||
yield run_on_reactor()
|
yield run_on_reactor()
|
||||||
|
|
||||||
|
# poke any interested application service.
|
||||||
|
self.hs.get_handlers().appservice_handler.notify_interested_services(
|
||||||
|
event
|
||||||
|
)
|
||||||
|
|
||||||
room_id = event.room_id
|
room_id = event.room_id
|
||||||
|
|
||||||
room_source = self.event_sources.sources["room"]
|
room_source = self.event_sources.sources["room"]
|
||||||
|
|
Loading…
Reference in a new issue