forked from MirrorHub/synapse
Use state to calculate get_users_in_room
This commit is contained in:
parent
43db0d9f6a
commit
c8285564a3
5 changed files with 47 additions and 30 deletions
|
@ -40,7 +40,7 @@ class ActionGenerator:
|
||||||
def handle_push_actions_for_event(self, event, context):
|
def handle_push_actions_for_event(self, event, context):
|
||||||
with Measure(self.clock, "handle_push_actions_for_event"):
|
with Measure(self.clock, "handle_push_actions_for_event"):
|
||||||
bulk_evaluator = yield evaluator_for_event(
|
bulk_evaluator = yield evaluator_for_event(
|
||||||
event, self.hs, self.store
|
event, self.hs, self.store, context.current_state
|
||||||
)
|
)
|
||||||
|
|
||||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||||
|
|
|
@ -21,7 +21,7 @@ from twisted.internet import defer
|
||||||
from .baserules import list_with_base_rules
|
from .baserules import list_with_base_rules
|
||||||
from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
from .push_rule_evaluator import PushRuleEvaluatorForEvent
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.visibility import filter_events_for_clients
|
from synapse.visibility import filter_events_for_clients
|
||||||
|
|
||||||
|
|
||||||
|
@ -72,20 +72,24 @@ def _get_rules(room_id, user_ids, store):
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def evaluator_for_event(event, hs, store):
|
def evaluator_for_event(event, hs, store, current_state):
|
||||||
room_id = event.room_id
|
room_id = event.room_id
|
||||||
|
|
||||||
# users in the room who have pushers need to get push rules run because
|
|
||||||
# that's how their pushers work
|
|
||||||
users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
|
|
||||||
|
|
||||||
# We also will want to generate notifs for other people in the room so
|
# We also will want to generate notifs for other people in the room so
|
||||||
# their unread countss are correct in the event stream, but to avoid
|
# their unread countss are correct in the event stream, but to avoid
|
||||||
# generating them for bot / AS users etc, we only do so for people who've
|
# generating them for bot / AS users etc, we only do so for people who've
|
||||||
# sent a read receipt into the room.
|
# sent a read receipt into the room.
|
||||||
|
|
||||||
all_in_room = yield store.get_users_in_room(room_id)
|
all_in_room = set(
|
||||||
all_in_room = set(all_in_room)
|
e.state_key for e in current_state.values()
|
||||||
|
if e.type == EventTypes.Member and e.membership == Membership.JOIN
|
||||||
|
)
|
||||||
|
|
||||||
|
# users in the room who have pushers need to get push rules run because
|
||||||
|
# that's how their pushers work
|
||||||
|
if_users_with_pushers = yield store.get_if_users_have_pushers(all_in_room)
|
||||||
|
users_with_pushers = set(
|
||||||
|
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
|
||||||
|
)
|
||||||
|
|
||||||
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
|
users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
|
||||||
|
|
||||||
|
@ -143,7 +147,10 @@ class BulkPushRuleEvaluator:
|
||||||
self.store, user_tuples, [event], {event.event_id: current_state}
|
self.store, user_tuples, [event], {event.event_id: current_state}
|
||||||
)
|
)
|
||||||
|
|
||||||
room_members = yield self.store.get_users_in_room(self.room_id)
|
room_members = set(
|
||||||
|
e.state_key for e in current_state.values()
|
||||||
|
if e.type == EventTypes.Member and e.membership == Membership.JOIN
|
||||||
|
)
|
||||||
|
|
||||||
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
|
evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
|
||||||
|
|
||||||
|
|
|
@ -342,9 +342,6 @@ class EventsStore(SQLBaseStore):
|
||||||
txn.call_after(self._get_current_state_for_key.invalidate_all)
|
txn.call_after(self._get_current_state_for_key.invalidate_all)
|
||||||
txn.call_after(self.get_rooms_for_user.invalidate_all)
|
txn.call_after(self.get_rooms_for_user.invalidate_all)
|
||||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
||||||
txn.call_after(
|
|
||||||
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
|
|
||||||
)
|
|
||||||
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
||||||
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
|
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
@ -135,19 +135,35 @@ class PusherStore(SQLBaseStore):
|
||||||
"get_all_updated_pushers", get_all_updated_pushers_txn
|
"get_all_updated_pushers", get_all_updated_pushers_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=1)
|
@cachedInlineCallbacks(lru=True, num_args=1)
|
||||||
def get_users_with_pushers_in_room(self, room_id):
|
def get_if_user_has_pusher(self, user_id):
|
||||||
users = yield self.get_users_in_room(room_id)
|
|
||||||
|
|
||||||
result = yield self._simple_select_many_batch(
|
result = yield self._simple_select_many_batch(
|
||||||
table='pushers',
|
table='pushers',
|
||||||
column='user_name',
|
keyvalues={
|
||||||
iterable=users,
|
'user_name': 'user_id',
|
||||||
retcols=['user_name'],
|
},
|
||||||
desc='get_users_with_pushers_in_room'
|
retcol='user_name',
|
||||||
|
desc='get_if_user_has_pusher',
|
||||||
|
allow_none=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue([r['user_name'] for r in result])
|
defer.returnValue(bool(result))
|
||||||
|
|
||||||
|
@cachedList(cached_method_name="get_if_user_has_pusher",
|
||||||
|
list_name="user_ids", num_args=1, inlineCallbacks=True)
|
||||||
|
def get_if_users_have_pushers(self, user_ids):
|
||||||
|
rows = yield self._simple_select_many_batch(
|
||||||
|
table='pushers',
|
||||||
|
column='user_name',
|
||||||
|
iterable=user_ids,
|
||||||
|
retcols=['user_name'],
|
||||||
|
desc='get_if_users_have_pushers'
|
||||||
|
)
|
||||||
|
|
||||||
|
result = {user_id: False for user_id in user_ids}
|
||||||
|
result.update({r['user_name']: True for r in rows})
|
||||||
|
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def add_pusher(self, user_id, access_token, kind, app_id,
|
def add_pusher(self, user_id, access_token, kind, app_id,
|
||||||
|
@ -178,16 +194,16 @@ class PusherStore(SQLBaseStore):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if newly_inserted:
|
if newly_inserted:
|
||||||
# get_users_with_pushers_in_room only cares if the user has
|
# get_if_user_has_pusher only cares if the user has
|
||||||
# at least *one* pusher.
|
# at least *one* pusher.
|
||||||
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
|
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
|
||||||
|
|
||||||
yield self.runInteraction("add_pusher", f)
|
yield self.runInteraction("add_pusher", f)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
|
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
|
||||||
def delete_pusher_txn(txn, stream_id):
|
def delete_pusher_txn(txn, stream_id):
|
||||||
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
|
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
|
||||||
|
|
||||||
self._simple_delete_one_txn(
|
self._simple_delete_one_txn(
|
||||||
txn,
|
txn,
|
||||||
|
|
|
@ -58,9 +58,6 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
|
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
|
||||||
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
||||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
||||||
txn.call_after(
|
|
||||||
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
|
|
||||||
)
|
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._membership_stream_cache.entity_has_changed,
|
self._membership_stream_cache.entity_has_changed,
|
||||||
event.state_key, event.internal_metadata.stream_ordering
|
event.state_key, event.internal_metadata.stream_ordering
|
||||||
|
|
Loading…
Reference in a new issue