Compare commits

...

3 commits

Author SHA1 Message Date
Erik Johnston 86090eadb0 Don't send outlier events to ASes 2018-04-06 10:21:17 +01:00
Erik Johnston edbeed06ca Fix MRO for replication stores 2018-04-06 10:20:50 +01:00
Erik Johnston 277d2c506d Add cache for if ASes have users in a room 2018-04-05 17:28:27 +01:00
9 changed files with 178 additions and 41 deletions

View file

@ -42,7 +42,7 @@ logger = logging.getLogger("synapse.app.appservice")
class AppserviceSlaveStore(
DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
DirectoryStore, SlavedApplicationServiceStore, SlavedEventStore,
SlavedRegistrationStore,
):
pass

View file

@ -50,11 +50,11 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedClientIpStore,

View file

@ -64,6 +64,7 @@ logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore(
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedPushRuleStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
@ -71,7 +72,6 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
RoomStore,

View file

@ -49,8 +49,8 @@ logger = logging.getLogger("synapse.app.user_dir")
class UserDirectorySlaveStore(
SlavedEventStore,
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore,
SlavedClientIpStore,
UserDirectoryStore,

View file

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import GroupID, get_domain_from_id
from twisted.internet import defer
@ -173,6 +172,7 @@ class ApplicationService(object):
if self.is_interested_in_user(event.sender):
defer.returnValue(True)
# also check m.room.member state key
if (event.type == EventTypes.Member and
self.is_interested_in_user(event.state_key)):
@ -181,20 +181,18 @@ class ApplicationService(object):
if not store:
defer.returnValue(False)
does_match = yield self._matches_user_in_member_list(event.room_id, store)
does_match = yield self._matches_user_in_member_list(
event, store,
)
defer.returnValue(does_match)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = yield store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
@defer.inlineCallbacks
def _matches_user_in_member_list(self, event, store):
ases = yield store.get_appservices_with_user_in_room(
event,
)
# check joined member events
for user_id in member_list:
if self.is_interested_in_user(user_id):
defer.returnValue(True)
defer.returnValue(False)
defer.returnValue(self.id in ases)
def _matches_room_id(self, event):
if hasattr(event, "room_id"):

View file

@ -17,8 +17,10 @@
from synapse.storage.appservice import (
ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
)
from synapse.replication.slave.storage.events import SlavedEventStore
class SlavedApplicationServiceStore(ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore):
ApplicationServiceWorkerStore,
SlavedEventStore):
pass

View file

@ -18,9 +18,14 @@ import re
import simplejson as json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage.events import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.async import Linearizer
from ._base import SQLBaseStore
@ -46,7 +51,8 @@ def _make_exclusive_regex(services_cache):
return exclusive_user_regex
class ApplicationServiceWorkerStore(SQLBaseStore):
class ApplicationServiceWorkerStore(RoomMemberWorkerStore, StateGroupWorkerStore,
SQLBaseStore):
def __init__(self, db_conn, hs):
self.services_cache = load_appservices(
hs.hostname,
@ -111,6 +117,38 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
return service
return None
@defer.inlineCallbacks
def get_appservices_with_user_in_room(self, event):
"""Get the list of appservices in the room at the given event
Args:
event (Event)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
state_group = yield self._get_state_group_for_event(event.event_id)
if not state_group:
raise Exception("No state group for event %s", event.event_id)
ases_in_room = yield self._get_appservices_with_user_in_room(
event.room_id, state_group,
)
defer.returnValue(ases_in_room)
@cachedInlineCallbacks(num_args=2, max_entries=10000)
def _get_appservices_with_user_in_room(self, room_id, state_group):
cache = self._get_appservices_with_user_in_room_cache(room_id)
ases_in_room = yield cache.get_appservices_in_room_by_user(state_group)
defer.returnValue(ases_in_room)
@cached(max_entries=10000)
def _get_appservices_with_user_in_room_cache(self, room_id):
return _AppserviceUsersCache(self, room_id)
class ApplicationServiceStore(ApplicationServiceWorkerStore):
# This is currently empty due to there not being any AS storage functions
@ -346,6 +384,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" AND NOT e.outlier"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
@ -374,3 +413,119 @@ class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStor
# to keep consistency with the other stores, we keep this empty class for
# now.
pass
class _AppserviceUsersCache(object):
"""Attempts to calculate which appservices have users in a given room by
looking at state groups and their delta_ids
"""
def __init__(self, store, room_id):
self.store = store
self.room_id = room_id
self.linearizer = Linearizer("_AppserviceUsersCache")
# The last state group we calculated the ASes in the room for.
self.state_group = object()
# A dict of all appservices in the room at the above state group,
# along with a user_id of an AS user in the room.
# Dict of as_id -> user_id.
self.appservices_in_room = {}
@defer.inlineCallbacks
def get_appservices_in_room_by_user(self, state_group):
"""
Args:
state_group(str)
Returns:
Deferred[set(str)]: The IDs of all ASes in the room
"""
assert state_group is not None
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
with (yield self.linearizer.queue(())):
# Set of ASes that we need to recalculate their membership of
# the room
uhandled_ases = set()
# If the state groups match then there is nothing to do
if state_group == self.state_group:
defer.returnValue(frozenset(self.appservices_in_room))
prev_group, delta_ids = yield self.store.get_state_group_delta(state_group)
# If the prev_group matches the last state group we can calculate
# the new value by looking at the deltas
if prev_group and prev_group == self.state_group:
for (typ, state_key), event_id in delta_ids.iteritems():
if typ != EventTypes.Member:
continue
user_id = state_key
event = yield self.store.get_event(event_id)
is_join = event.membership == Membership.JOIN
for appservice in self.store.get_app_services():
as_id = appservice.id
# If this is a join and the appservice is already in
# the room then its a noop
if is_join:
if as_id in self.appservices_in_room:
continue
# If this is not a join, then we only need to recalculate
# if the AS is in the room and the cached joined AS user
# matches this event.
elif self.appservices_in_room.get(as_id, None) != user_id:
continue
# If the AS is not interested in the user then its a
# noop.
if not appservice.is_interested_in_user(user_id):
continue
if is_join:
# If an AS user is joining then the AS is now
# interested in the room
self.appservices_in_room[as_id] = user_id
else:
# If an AS user has left then we need to
# recalcualte if they're in the room.
uhandled_ases.add(appservice)
self.appservices_in_room.pop(as_id, None)
else:
uhandled_ases = set(self.store.get_app_services())
if uhandled_ases:
# We need to recalculate which ASes are in the room, so lets
# get the current state and try and find a join event
# that the AS is interested in.
current_state_ids = yield self.store.get_state_ids_for_group(state_group)
for appservice in uhandled_ases:
as_id = appservice.id
self.appservices_in_room.pop(as_id, None)
for (etype, state_key), event_id in current_state_ids.iteritems():
if etype != EventTypes.Member:
continue
if not appservice.is_interested_in_user(state_key):
continue
event = yield self.store.get_event(event_id)
if event.membership == Membership.JOIN:
self.appservices_in_room[as_id] = state_key
break
self.state_group = state_group
defer.returnValue(frozenset(self.appservices_in_room))

View file

@ -440,7 +440,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
# We don't use `state_group`, its there so that we can cache based
# on it. However, its important that its never None, since two current_state's

View file

@ -55,7 +55,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_user_id_prefix_no_match(self):
@ -63,7 +63,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
self.event.sender = "@someone_else:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event)))
self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_member_is_checked(self):
@ -73,7 +73,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.event.sender = "@someone_else:matrix.org"
self.event.type = "m.room.member"
self.event.state_key = "@irc_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_match(self):
@ -81,7 +81,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org"
self.assertTrue((yield self.service.is_interested(self.event)))
self.assertTrue((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_room_id_no_match(self):
@ -89,7 +89,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("!some_prefix.*some_suffix:matrix.org")
)
self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org"
self.assertFalse((yield self.service.is_interested(self.event)))
self.assertFalse((yield self.service.is_interested(self.event, None)))
@defer.inlineCallbacks
def test_regex_alias_match(self):
@ -160,7 +160,7 @@ class ApplicationServiceTestCase(unittest.TestCase):
self.store.get_aliases_for_room.return_value = [
"#xmpp_foobar:matrix.org", "#athing:matrix.org"
]
self.store.get_users_in_room.return_value = []
self.store.get_appservices_with_user_in_room.return_value = []
self.assertFalse((yield self.service.is_interested(
self.event, self.store
)))
@ -193,20 +193,3 @@ class ApplicationServiceTestCase(unittest.TestCase):
}
self.event.state_key = self.service.sender
self.assertTrue((yield self.service.is_interested(self.event)))
@defer.inlineCallbacks
def test_member_list_match(self):
self.service.namespaces[ApplicationService.NS_USERS].append(
_regex("@irc_.*")
)
self.store.get_users_in_room.return_value = [
"@alice:here",
"@irc_fo:here", # AS user
"@bob:here",
]
self.store.get_aliases_for_room.return_value = []
self.event.sender = "@xmpp_foobar:matrix.org"
self.assertTrue((yield self.service.is_interested(
event=self.event, store=self.store
)))