Compare commits
3 commits
develop
...
erikj/as_u
Author | SHA1 | Date | |
---|---|---|---|
|
86090eadb0 | ||
|
edbeed06ca | ||
|
277d2c506d |
|
@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
|
|||
|
||||
|
||||
class AppserviceSlaveStore(
|
||||
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
|
||||
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
):
|
||||
pass
|
||||
|
|
|
@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
|
|||
|
||||
|
||||
class ClientReaderSlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
TransactionStore,
|
||||
SlavedClientIpStore,
|
||||
|
|
|
@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
|
|||
class SynchrotronSlavedStore(
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
|
@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
|
|||
SlavedGroupServerStore,
|
||||
SlavedDeviceInboxStore,
|
||||
SlavedDeviceStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedClientIpStore,
|
||||
RoomStore,
|
||||
|
|
|
@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
|
|||
|
||||
|
||||
class UserDirectorySlaveStore(
|
||||
SlavedEventStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
UserDirectoryStore,
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.types import GroupID, get_domain_from_id
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -173,6 +172,7 @@ class ApplicationService(object):
|
|||
|
||||
if self.is_interested_in_user(event.sender):
|
||||
defer.returnValue(True)
|
||||
|
||||
# also check m.room.member state key
|
||||
if (event.type == EventTypes.Member and
|
||||
self.is_interested_in_user(event.state_key)):
|
||||
|
@ -181,20 +181,18 @@ class ApplicationService(object):
|
|||
if not store:
|
||||
defer.returnValue(False)
|
||||
|
||||
does_match = yield self._matches_user_in_member_list(event.room_id, store)
|
||||
does_match = yield self._matches_user_in_member_list(
|
||||
event, store,
|
||||
)
|
||||
defer.returnValue(does_match)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1, cache_context=True)
|
||||
def _matches_user_in_member_list(self, room_id, store, cache_context):
|
||||
member_list = yield store.get_users_in_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
@defer.inlineCallbacks
|
||||
def _matches_user_in_member_list(self, event, store):
|
||||
ases = yield store.get_appservices_with_user_in_room(
|
||||
event,
|
||||
)
|
||||
|
||||
# check joined member events
|
||||
for user_id in member_list:
|
||||
if self.is_interested_in_user(user_id):
|
||||
defer.returnValue(True)
|
||||
defer.returnValue(False)
|
||||
defer.returnValue(self.id in ases)
|
||||
|
||||
def _matches_room_id(self, event):
|
||||
if hasattr(event, "room_id"):
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
from synapse.storage.appservice import (
|
||||
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
|
||||
)
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
|
||||
|
||||
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
|
||||
ApplicationServiceWorkerStore):
|
||||
ApplicationServiceWorkerStore,
|
||||
SlavedEventStore):
|
||||
pass
|
||||
|
|
|
@ -18,9 +18,14 @@ import re
|
|||
import simplejson as json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.appservice import AppServiceTransaction
|
||||
from synapse.config.appservice import load_appservices
|
||||
from synapse.storage.events import EventsWorkerStore
|
||||
from synapse.storage.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.state import StateGroupWorkerStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.async import Linearizer
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
||||
|
@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
|
|||
return exclusive_user_regex
|
||||
|
||||
|
||||
class ApplicationServiceWorkerStore(SQLBaseStore):
|
||||
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
|
||||
SQLBaseStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
self.services_cache = load_appservices(
|
||||
hs.hostname,
|
||||
|
@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
|
|||
return service
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_with_user_in_room(self, event):
|
||||
"""Get the list of appservices in the room at the given event
|
||||
|
||||
Args:
|
||||
event (Event)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
state_group = yield self._get_state_group_for_event(event.event_id)
|
||||
|
||||
if not state_group:
|
||||
raise Exception("No state group for event %s", event.event_id)
|
||||
|
||||
ases_in_room = yield self._get_appservices_with_user_in_room(
|
||||
event.room_id, state_group,
|
||||
)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000)
|
||||
def _get_appservices_with_user_in_room(self, room_id, state_group):
|
||||
cache = self._get_appservices_with_user_in_room_cache(room_id)
|
||||
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
|
||||
|
||||
defer.returnValue(ases_in_room)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_appservices_with_user_in_room_cache(self, room_id):
|
||||
return _AppserviceUsersCache(self, room_id)
|
||||
|
||||
|
||||
class ApplicationServiceStore(ApplicationServiceWorkerStore):
|
||||
# This is currently empty due to there not being any AS storage functions
|
||||
|
@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
|
|||
" (SELECT stream_ordering FROM appservice_stream_position)"
|
||||
" < e.stream_ordering"
|
||||
" AND e.stream_ordering <= ?"
|
||||
" AND NOT e.outlier"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
|
|||
# to keep consistency with the other stores, we keep this empty class for
|
||||
# now.
|
||||
pass
|
||||
|
||||
|
||||
class _AppserviceUsersCache(object):
|
||||
"""Attempts to calculate which appservices have users in a given room by
|
||||
looking at state groups and their delta_ids
|
||||
"""
|
||||
|
||||
def __init__(self, store, room_id):
|
||||
self.store = store
|
||||
self.room_id = room_id
|
||||
|
||||
self.linearizer = Linearizer("_AppserviceUsersCache")
|
||||
|
||||
# The last state group we calculated the ASes in the room for.
|
||||
self.state_group = object()
|
||||
|
||||
# A dict of all appservices in the room at the above state group,
|
||||
# along with a user_id of an AS user in the room.
|
||||
# Dict of as_id -> user_id.
|
||||
self.appservices_in_room = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_appservices_in_room_by_user(self, state_group):
|
||||
"""
|
||||
Args:
|
||||
state_group(str)
|
||||
|
||||
Returns:
|
||||
Deferred[set(str)]: The IDs of all ASes in the room
|
||||
"""
|
||||
assert state_group is not None
|
||||
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
with (yield self.linearizer.queue(())):
|
||||
# Set of ASes that we need to recalculate their membership of
|
||||
# the room
|
||||
uhandled_ases = set()
|
||||
|
||||
# If the state groups match then there is nothing to do
|
||||
if state_group == self.state_group:
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
||||
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
|
||||
|
||||
# If the prev_group matches the last state group we can calculate
|
||||
# the new value by looking at the deltas
|
||||
if prev_group and prev_group == self.state_group:
|
||||
for (typ, state_key), event_id in delta_ids.iteritems():
|
||||
if typ != EventTypes.Member:
|
||||
continue
|
||||
|
||||
user_id = state_key
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
|
||||
is_join = event.membership == Membership.JOIN
|
||||
for appservice in self.store.get_app_services():
|
||||
as_id = appservice.id
|
||||
|
||||
# If this is a join and the appservice is already in
|
||||
# the room then its a noop
|
||||
if is_join:
|
||||
if as_id in self.appservices_in_room:
|
||||
continue
|
||||
# If this is not a join, then we only need to recalculate
|
||||
# if the AS is in the room and the cached joined AS user
|
||||
# matches this event.
|
||||
elif self.appservices_in_room.get(as_id, None) != user_id:
|
||||
continue
|
||||
|
||||
# If the AS is not interested in the user then its a
|
||||
# noop.
|
||||
if not appservice.is_interested_in_user(user_id):
|
||||
continue
|
||||
|
||||
if is_join:
|
||||
# If an AS user is joining then the AS is now
|
||||
# interested in the room
|
||||
self.appservices_in_room[as_id] = user_id
|
||||
else:
|
||||
# If an AS user has left then we need to
|
||||
# recalcualte if they're in the room.
|
||||
uhandled_ases.add(appservice)
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
else:
|
||||
uhandled_ases = set(self.store.get_app_services())
|
||||
|
||||
if uhandled_ases:
|
||||
# We need to recalculate which ASes are in the room, so lets
|
||||
# get the current state and try and find a join event
|
||||
# that the AS is interested in.
|
||||
|
||||
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
|
||||
|
||||
for appservice in uhandled_ases:
|
||||
as_id = appservice.id
|
||||
|
||||
self.appservices_in_room.pop(as_id, None)
|
||||
|
||||
for (etype, state_key), event_id in current_state_ids.iteritems():
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if not appservice.is_interested_in_user(state_key):
|
||||
continue
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
if event.membership == Membership.JOIN:
|
||||
self.appservices_in_room[as_id] = state_key
|
||||
break
|
||||
|
||||
self.state_group = state_group
|
||||
|
||||
defer.returnValue(frozenset(self.appservices_in_room))
|
||||
|
|
|
@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
||||
# @defer.inlineCallbacks
|
||||
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
|
||||
# We don't use `state_group`, its there so that we can cache based
|
||||
# on it. However, its important that its never None, since two current_state's
|
||||
|
|
|
@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_user_id_prefix_no_match(self):
|
||||
|
@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
_regex("@irc_.*")
|
||||
)
|
||||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_member_is_checked(self):
|
||||
|
@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
self.event.sender = "@someone_else:matrix.org"
|
||||
self.event.type = "m.room.member"
|
||||
self.event.state_key = "@irc_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_match(self):
|
||||
|
@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
self.assertTrue((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_room_id_no_match(self):
|
||||
|
@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
_regex("!some_prefix.*some_suffix:matrix.org")
|
||||
)
|
||||
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
|
||||
self.assertFalse((yield self.service.is_interested(self.event)))
|
||||
self.assertFalse((yield self.service.is_interested(self.event, None)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_regex_alias_match(self):
|
||||
|
@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
self.store.get_aliases_for_room.return_value = [
|
||||
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
|
||||
]
|
||||
self.store.get_users_in_room.return_value = []
|
||||
self.store.get_appservices_with_user_in_room.return_value = []
|
||||
self.assertFalse((yield self.service.is_interested(
|
||||
self.event, self.store
|
||||
)))
|
||||
|
@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
|
|||
}
|
||||
self.event.state_key = self.service.sender
|
||||
self.assertTrue((yield self.service.is_interested(self.event)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_member_list_match(self):
|
||||
self.service.namespaces[ApplicationService.NS_USERS].append(
|
||||
_regex("@irc_.*")
|
||||
)
|
||||
self.store.get_users_in_room.return_value = [
|
||||
"@alice:here",
|
||||
"@irc_fo:here", # AS user
|
||||
"@bob:here",
|
||||
]
|
||||
self.store.get_aliases_for_room.return_value = []
|
||||
|
||||
self.event.sender = "@xmpp_foobar:matrix.org"
|
||||
self.assertTrue((yield self.service.is_interested(
|
||||
event=self.event, store=self.store
|
||||
)))
|
||||
|
|
Loading…
Reference in a new issue