mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-17 07:21:37 +01:00
Remove more reg/unreg methods. Read config not database for cache.
This commit is contained in:
parent
e7887e37a8
commit
d33ae65efc
3 changed files with 58 additions and 235 deletions
|
@ -16,10 +16,8 @@
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import Codes, StoreError, SynapseError
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.types import UserID
|
||||
import synapse.util.stringutils as stringutils
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -49,38 +47,6 @@ class ApplicationServicesHandler(object):
|
|||
self.scheduler = appservice_scheduler
|
||||
self.started_scheduler = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def register(self, app_service):
|
||||
logger.info("Register -> %s", app_service)
|
||||
# check the token is recognised
|
||||
try:
|
||||
stored_service = yield self.store.get_app_service_by_token(
|
||||
app_service.token
|
||||
)
|
||||
if not stored_service:
|
||||
raise StoreError(404, "Application service not found")
|
||||
app_service.id = stored_service.id
|
||||
except StoreError:
|
||||
raise SynapseError(
|
||||
403, "Unrecognised application services token. "
|
||||
"Consult the home server admin.",
|
||||
errcode=Codes.FORBIDDEN
|
||||
)
|
||||
app_service.hs_token = self._generate_hs_token()
|
||||
|
||||
# create a sender for this application service which is used when
|
||||
# creating rooms, etc..
|
||||
account = yield self.hs.get_handlers().registration_handler.register()
|
||||
app_service.sender = account[0]
|
||||
|
||||
yield self.store.update_app_service(app_service)
|
||||
defer.returnValue(app_service)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def unregister(self, token):
|
||||
logger.info("Unregister as_token=%s", token)
|
||||
yield self.store.unregister_app_service(token)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify_interested_services(self, event):
|
||||
"""Notifies (pushes) all application services interested in this event.
|
||||
|
@ -223,6 +189,3 @@ class ApplicationServicesHandler(object):
|
|||
exists = yield self.query_user_exists(user_id)
|
||||
defer.returnValue(exists)
|
||||
defer.returnValue(True)
|
||||
|
||||
def _generate_hs_token(self):
|
||||
return stringutils.random_string(24)
|
||||
|
|
|
@ -13,12 +13,12 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import yaml
|
||||
from simplejson import JSONDecodeError
|
||||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.appservice import ApplicationService, AppServiceTransaction
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from ._base import SQLBaseStore
|
||||
|
@ -27,141 +27,18 @@ from ._base import SQLBaseStore
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def log_failure(failure):
|
||||
logger.error("Failed to detect application services: %s", failure.value)
|
||||
logger.error(failure.getTraceback())
|
||||
|
||||
|
||||
class ApplicationServiceStore(SQLBaseStore):
|
||||
|
||||
def __init__(self, hs):
|
||||
super(ApplicationServiceStore, self).__init__(hs)
|
||||
self.services_cache = []
|
||||
self.cache_defer = self._populate_appservice_cache()
|
||||
self.cache_defer.addErrback(log_failure)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def unregister_app_service(self, token):
|
||||
"""Unregisters this service.
|
||||
|
||||
This removes all AS specific regex and the base URL. The token is the
|
||||
only thing preserved for future registration attempts.
|
||||
"""
|
||||
yield self.cache_defer # make sure the cache is ready
|
||||
yield self.runInteraction(
|
||||
"unregister_app_service",
|
||||
self._unregister_app_service_txn,
|
||||
token,
|
||||
)
|
||||
# update cache TODO: Should this be in the txn?
|
||||
for service in self.services_cache:
|
||||
if service.token == token:
|
||||
service.url = None
|
||||
service.namespaces = None
|
||||
service.hs_token = None
|
||||
|
||||
def _unregister_app_service_txn(self, txn, token):
|
||||
# kill the url to prevent pushes
|
||||
txn.execute(
|
||||
"UPDATE application_services SET url=NULL WHERE token=?",
|
||||
(token,)
|
||||
self._populate_appservice_cache(
|
||||
hs.config.app_service_config_files
|
||||
)
|
||||
|
||||
# cleanup regex
|
||||
as_id = self._get_as_id_txn(txn, token)
|
||||
if not as_id:
|
||||
logger.warning(
|
||||
"unregister_app_service_txn: Failed to find as_id for token=",
|
||||
token
|
||||
)
|
||||
return False
|
||||
|
||||
txn.execute(
|
||||
"DELETE FROM application_services_regex WHERE as_id=?",
|
||||
(as_id,)
|
||||
)
|
||||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_app_service(self, service):
|
||||
"""Update an application service, clobbering what was previously there.
|
||||
|
||||
Args:
|
||||
service(ApplicationService): The updated service.
|
||||
"""
|
||||
yield self.cache_defer # make sure the cache is ready
|
||||
|
||||
# NB: There is no "insert" since we provide no public-facing API to
|
||||
# allocate new ASes. It relies on the server admin inserting the AS
|
||||
# token into the database manually.
|
||||
|
||||
if not service.token or not service.url:
|
||||
raise StoreError(400, "Token and url must be specified.")
|
||||
|
||||
if not service.hs_token:
|
||||
raise StoreError(500, "No HS token")
|
||||
|
||||
as_id = yield self.runInteraction(
|
||||
"update_app_service",
|
||||
self._update_app_service_txn,
|
||||
service
|
||||
)
|
||||
service.id = as_id
|
||||
|
||||
# update cache TODO: Should this be in the txn?
|
||||
for (index, cache_service) in enumerate(self.services_cache):
|
||||
if service.token == cache_service.token:
|
||||
self.services_cache[index] = service
|
||||
logger.info("Updated: %s", service)
|
||||
return
|
||||
# new entry
|
||||
self.services_cache.append(service)
|
||||
logger.info("Updated(new): %s", service)
|
||||
|
||||
def _update_app_service_txn(self, txn, service):
|
||||
as_id = self._get_as_id_txn(txn, service.token)
|
||||
if not as_id:
|
||||
logger.warning(
|
||||
"update_app_service_txn: Failed to find as_id for token=",
|
||||
service.token
|
||||
)
|
||||
return
|
||||
|
||||
txn.execute(
|
||||
"UPDATE application_services SET url=?, hs_token=?, sender=? "
|
||||
"WHERE id=?",
|
||||
(service.url, service.hs_token, service.sender, as_id,)
|
||||
)
|
||||
# cleanup regex
|
||||
txn.execute(
|
||||
"DELETE FROM application_services_regex WHERE as_id=?",
|
||||
(as_id,)
|
||||
)
|
||||
for (ns_int, ns_str) in enumerate(ApplicationService.NS_LIST):
|
||||
if ns_str in service.namespaces:
|
||||
for regex_obj in service.namespaces[ns_str]:
|
||||
txn.execute(
|
||||
"INSERT INTO application_services_regex("
|
||||
"as_id, namespace, regex) values(?,?,?)",
|
||||
(as_id, ns_int, json.dumps(regex_obj))
|
||||
)
|
||||
return as_id
|
||||
|
||||
def _get_as_id_txn(self, txn, token):
|
||||
cursor = txn.execute(
|
||||
"SELECT id FROM application_services WHERE token=?",
|
||||
(token,)
|
||||
)
|
||||
res = cursor.fetchone()
|
||||
if res:
|
||||
return res[0]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_app_services(self):
|
||||
yield self.cache_defer # make sure the cache is ready
|
||||
defer.returnValue(self.services_cache)
|
||||
defer.succeed(self.services_cache)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_app_service_by_user_id(self, user_id):
|
||||
"""Retrieve an application service from their user ID.
|
||||
|
||||
|
@ -175,37 +52,24 @@ class ApplicationServiceStore(SQLBaseStore):
|
|||
Returns:
|
||||
synapse.appservice.ApplicationService or None.
|
||||
"""
|
||||
|
||||
yield self.cache_defer # make sure the cache is ready
|
||||
|
||||
for service in self.services_cache:
|
||||
if service.sender == user_id:
|
||||
defer.returnValue(service)
|
||||
defer.succeed(service)
|
||||
return
|
||||
defer.returnValue(None)
|
||||
defer.succeed(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_app_service_by_token(self, token, from_cache=True):
|
||||
def get_app_service_by_token(self, token):
|
||||
"""Get the application service with the given appservice token.
|
||||
|
||||
Args:
|
||||
token (str): The application service token.
|
||||
from_cache (bool): True to get this service from the cache, False to
|
||||
check the database.
|
||||
Raises:
|
||||
StoreError if there was a problem retrieving this service.
|
||||
Returns:
|
||||
synapse.appservice.ApplicationService or None.
|
||||
"""
|
||||
yield self.cache_defer # make sure the cache is ready
|
||||
|
||||
if from_cache:
|
||||
for service in self.services_cache:
|
||||
if service.token == token:
|
||||
defer.returnValue(service)
|
||||
return
|
||||
defer.returnValue(None)
|
||||
|
||||
# TODO: The from_cache=False impl
|
||||
# TODO: This should be JOINed with the application_services_regex table.
|
||||
for service in self.services_cache:
|
||||
if service.token == token:
|
||||
return defer.succeed(service)
|
||||
defer.succeed(None)
|
||||
|
||||
def get_app_service_rooms(self, service):
|
||||
"""Get a list of RoomsForUser for this application service.
|
||||
|
@ -336,18 +200,53 @@ class ApplicationServiceStore(SQLBaseStore):
|
|||
))
|
||||
return service_list
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_appservice_cache(self):
|
||||
"""Populates the ApplicationServiceCache from the database."""
|
||||
sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN "
|
||||
"application_services_regex AS r ON a.id = r.as_id")
|
||||
def _load_appservice(self, as_info):
|
||||
required_string_fields = ["url", "as_token", "hs_token", "sender"]
|
||||
for field in required_string_fields:
|
||||
if not isinstance(as_info.get(field), basestring):
|
||||
raise KeyError("Required string field: '%s'", field)
|
||||
|
||||
results = yield self._execute_and_decode("appservice_cache", sql)
|
||||
services = self._parse_services_dict(results)
|
||||
# namespace checks
|
||||
if not isinstance(as_info.get("namespaces"), dict):
|
||||
raise KeyError("Requires 'namespaces' object.")
|
||||
for ns in ApplicationService.NS_LIST:
|
||||
# specific namespaces are optional
|
||||
if ns in as_info["namespaces"]:
|
||||
# expect a list of dicts with exclusive and regex keys
|
||||
for regex_obj in as_info["namespaces"][ns]:
|
||||
if not isinstance(regex_obj, dict):
|
||||
raise ValueError(
|
||||
"Expected namespace entry in %s to be an object,"
|
||||
" but got %s", ns, regex_obj
|
||||
)
|
||||
if not isinstance(regex_obj.get("regex"), basestring):
|
||||
raise ValueError(
|
||||
"Missing/bad type 'regex' key in %s", regex_obj
|
||||
)
|
||||
if not isinstance(regex_obj.get("exclusive"), bool):
|
||||
raise ValueError(
|
||||
"Missing/bad type 'exclusive' key in %s", regex_obj
|
||||
)
|
||||
return ApplicationService(
|
||||
token=as_info["as_token"],
|
||||
url=as_info["url"],
|
||||
namespaces=as_info["namespaces"],
|
||||
hs_token=as_info["hs_token"],
|
||||
sender=as_info["sender"]
|
||||
)
|
||||
|
||||
for service in services:
|
||||
logger.info("Found application service: %s", service)
|
||||
self.services_cache.append(service)
|
||||
def _populate_appservice_cache(self, config_files):
|
||||
"""Populates a cache of Application Services from the config files."""
|
||||
for config_file in config_files:
|
||||
try:
|
||||
with open(config_file, 'r') as f:
|
||||
as_info = yaml.load(f)
|
||||
appservice = self._load_appservice(as_info)
|
||||
logger.info("Loaded application service: %s", appservice)
|
||||
self.services_cache.append(appservice)
|
||||
except Exception as e:
|
||||
logger.error("Failed to load appservice from '%s'", config_file)
|
||||
logger.exception(e)
|
||||
|
||||
|
||||
class ApplicationServiceTransactionStore(SQLBaseStore):
|
||||
|
|
|
@ -49,45 +49,6 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
|
|||
# must be done after inserts
|
||||
self.store = ApplicationServiceStore(hs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_update_and_retrieval_of_service(self):
|
||||
url = "https://matrix.org/appservices/foobar"
|
||||
hs_token = "hstok"
|
||||
user_regex = [
|
||||
{"regex": "@foobar_.*:matrix.org", "exclusive": True}
|
||||
]
|
||||
alias_regex = [
|
||||
{"regex": "#foobar_.*:matrix.org", "exclusive": False}
|
||||
]
|
||||
room_regex = [
|
||||
|
||||
]
|
||||
service = ApplicationService(
|
||||
url=url, hs_token=hs_token, token=self.as_token, namespaces={
|
||||
ApplicationService.NS_USERS: user_regex,
|
||||
ApplicationService.NS_ALIASES: alias_regex,
|
||||
ApplicationService.NS_ROOMS: room_regex
|
||||
})
|
||||
yield self.store.update_app_service(service)
|
||||
|
||||
stored_service = yield self.store.get_app_service_by_token(
|
||||
self.as_token
|
||||
)
|
||||
self.assertEquals(stored_service.token, self.as_token)
|
||||
self.assertEquals(stored_service.url, url)
|
||||
self.assertEquals(
|
||||
stored_service.namespaces[ApplicationService.NS_ALIASES],
|
||||
alias_regex
|
||||
)
|
||||
self.assertEquals(
|
||||
stored_service.namespaces[ApplicationService.NS_ROOMS],
|
||||
room_regex
|
||||
)
|
||||
self.assertEquals(
|
||||
stored_service.namespaces[ApplicationService.NS_USERS],
|
||||
user_regex
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_retrieve_unknown_service_token(self):
|
||||
service = yield self.store.get_app_service_by_token("invalid_token")
|
||||
|
|
Loading…
Reference in a new issue