forked from MirrorHub/synapse
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/state_storage
This commit is contained in:
commit
9e6d88f4e2
5 changed files with 35 additions and 60 deletions
|
@ -832,11 +832,13 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
new_pdu = event
|
||||
|
||||
message_handler = self.hs.get_handlers().message_handler
|
||||
destinations = yield message_handler.get_joined_hosts_for_room_from_state(
|
||||
context
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = set(
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
)
|
||||
destinations = set(destinations)
|
||||
|
||||
destinations.discard(origin)
|
||||
|
||||
logger.debug(
|
||||
|
@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
new_pdu = event
|
||||
|
||||
message_handler = self.hs.get_handlers().message_handler
|
||||
destinations = yield message_handler.get_joined_hosts_for_room_from_state(
|
||||
context
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = set(
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
)
|
||||
destinations = set(destinations)
|
||||
destinations.discard(origin)
|
||||
|
||||
logger.debug(
|
||||
|
|
|
@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo
|
|||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
@ -945,7 +944,12 @@ class MessageHandler(BaseHandler):
|
|||
event_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
destinations = yield self.get_joined_hosts_for_room_from_state(context)
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = [
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify():
|
||||
|
@ -963,39 +967,3 @@ class MessageHandler(BaseHandler):
|
|||
preserve_fn(federation_handler.handle_new_event)(
|
||||
event, destinations=destinations,
|
||||
)
|
||||
|
||||
def get_joined_hosts_for_room_from_state(self, context):
|
||||
state_group = context.state_group
|
||||
if not state_group:
|
||||
# If state_group is None it means it has yet to be assigned a
|
||||
# state group, i.e. we need to make sure that calls with a state_group
|
||||
# of None don't hit previous cached calls with a None state_group.
|
||||
# To do this we set the state_group to a new object as object() != object()
|
||||
state_group = object()
|
||||
|
||||
return self._get_joined_hosts_for_room_from_state(
|
||||
state_group, context.current_state_ids
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1, cache_context=True)
|
||||
def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids,
|
||||
cache_context):
|
||||
|
||||
# Don't bother getting state for people on the same HS
|
||||
current_state = yield self.store.get_events([
|
||||
e_id for key, e_id in current_state_ids.items()
|
||||
if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1])
|
||||
])
|
||||
|
||||
destinations = set()
|
||||
for e in current_state.itervalues():
|
||||
try:
|
||||
if e.type == EventTypes.Member:
|
||||
if e.content["membership"] == Membership.JOIN:
|
||||
destinations.add(get_domain_from_id(e.state_key))
|
||||
except SynapseError:
|
||||
logger.warn(
|
||||
"Failed to get destination from event %s", e.event_id
|
||||
)
|
||||
|
||||
defer.returnValue(destinations)
|
||||
|
|
|
@ -52,6 +52,8 @@ bump_active_time_counter = metrics.register_counter("bump_active_time")
|
|||
|
||||
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
|
||||
|
||||
notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
|
||||
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
|
@ -940,26 +942,32 @@ def should_notify(old_state, new_state):
|
|||
"""Decides if a presence state change should be sent to interested parties.
|
||||
"""
|
||||
if old_state.status_msg != new_state.status_msg:
|
||||
notify_reason_counter.inc("status_msg_change")
|
||||
return True
|
||||
|
||||
if old_state.state == PresenceState.ONLINE:
|
||||
if new_state.state != PresenceState.ONLINE:
|
||||
# Always notify for online -> anything
|
||||
notify_reason_counter.inc("online_to_not")
|
||||
return True
|
||||
|
||||
if new_state.currently_active != old_state.currently_active:
|
||||
notify_reason_counter.inc("current_active_change")
|
||||
return True
|
||||
|
||||
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
|
||||
# Only notify about last active bumps if we're not currently acive
|
||||
if not (old_state.currently_active and new_state.currently_active):
|
||||
notify_reason_counter.inc("last_active_change")
|
||||
return True
|
||||
|
||||
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
|
||||
# Always notify for a transition where last active gets bumped.
|
||||
notify_reason_counter.inc("last_active_change")
|
||||
return True
|
||||
|
||||
if old_state.state != new_state.state:
|
||||
notify_reason_counter.inc("state_change")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
|
|
@ -38,7 +38,7 @@ def _get_rules(room_id, user_ids, store):
|
|||
@defer.inlineCallbacks
|
||||
def evaluator_for_event(event, hs, store, context):
|
||||
rules_by_user = yield store.bulk_get_push_rules_for_room(
|
||||
event.room_id, context
|
||||
event, context
|
||||
)
|
||||
|
||||
# if this event is an invite event, we may need to run rules for the user
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from twisted.internet import defer
|
||||
|
||||
import logging
|
||||
|
@ -124,7 +123,7 @@ class PushRuleStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue(results)
|
||||
|
||||
def bulk_get_push_rules_for_room(self, room_id, context):
|
||||
def bulk_get_push_rules_for_room(self, event, context):
|
||||
state_group = context.state_group
|
||||
if not state_group:
|
||||
# If state_group is None it means it has yet to be assigned a
|
||||
|
@ -134,12 +133,12 @@ class PushRuleStore(SQLBaseStore):
|
|||
state_group = object()
|
||||
|
||||
return self._bulk_get_push_rules_for_room(
|
||||
room_id, state_group, context.current_state_ids
|
||||
event.room_id, state_group, context.current_state_ids, event=event
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, cache_context=True)
|
||||
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
|
||||
cache_context):
|
||||
cache_context, event=None):
|
||||
# We don't use `state_group`, its there so that we can cache based
|
||||
# on it. However, its important that its never None, since two current_state's
|
||||
# with a state_group of None are likely to be different.
|
||||
|
@ -150,18 +149,15 @@ class PushRuleStore(SQLBaseStore):
|
|||
# their unread countss are correct in the event stream, but to avoid
|
||||
# generating them for bot / AS users etc, we only do so for people who've
|
||||
# sent a read receipt into the room.
|
||||
local_user_member_ids = [
|
||||
e_id for (etype, state_key), e_id in current_state_ids.iteritems()
|
||||
if etype == EventTypes.Member and self.hs.is_mine_id(state_key)
|
||||
]
|
||||
|
||||
local_member_events = yield self._get_events(local_user_member_ids)
|
||||
|
||||
local_users_in_room = set(
|
||||
member_event.state_key for member_event in local_member_events
|
||||
if member_event.membership == Membership.JOIN
|
||||
users_in_room = yield self._get_joined_users_from_context(
|
||||
room_id, state_group, current_state_ids,
|
||||
on_invalidate=cache_context.invalidate,
|
||||
event=event,
|
||||
)
|
||||
|
||||
local_users_in_room = set(u for u in users_in_room if self.hs.is_mine_id(u))
|
||||
|
||||
# users in the room who have pushers need to get push rules run because
|
||||
# that's how their pushers work
|
||||
if_users_with_pushers = yield self.get_if_users_have_pushers(
|
||||
|
|
Loading…
Reference in a new issue