Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2019-04-02 18:25:45 +01:00
commit 613b443ff0
10 changed files with 80 additions and 9 deletions

1
changelog.d/4974.misc Normal file
View file

@ -0,0 +1 @@
Add `config.signing_key_path` that can be read by `synapse.config` utility.

1
changelog.d/4990.bugfix Normal file
View file

@ -0,0 +1 @@
Transfer related groups on room upgrade.

1
changelog.d/4991.feature Normal file
View file

@ -0,0 +1 @@
Reduce CPU usage starting pushers during start up.

View file

@ -69,6 +69,7 @@ class EventTypes(object):
Redaction = "m.room.redaction"
ThirdPartyInvite = "m.room.third_party_invite"
Encryption = "m.room.encryption"
RelatedGroups = "m.room.related_groups"
RoomHistoryVisibility = "m.room.history_visibility"
CanonicalAlias = "m.room.canonical_alias"

View file

@ -42,7 +42,8 @@ class KeyConfig(Config):
if "signing_key" in config:
self.signing_key = read_signing_keys([config["signing_key"]])
else:
self.signing_key = self.read_signing_key(config["signing_key_path"])
self.signing_key_path = config["signing_key_path"]
self.signing_key = self.read_signing_key(self.signing_key_path)
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})

View file

@ -280,6 +280,7 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.RoomAvatar, ""),
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(

View file

@ -72,8 +72,15 @@ class EmailPusher(object):
self._is_processing = False
def on_started(self):
if self.mailer is not None:
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs and self.mailer is not None:
self._start_processing()
def on_stop(self):

View file

@ -116,8 +116,16 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
def on_started(self):
self._start_processing()
def on_started(self, should_check_for_notifs):
"""Called when this pusher has been started.
Args:
should_check_for_notifs (bool): Whether we should immediately
check for push to send. Set to False only if it's known there
is nothing to send
"""
if should_check_for_notifs:
self._start_processing()
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)

View file

@ -21,6 +21,7 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
logger = logging.getLogger(__name__)
@ -197,7 +198,7 @@ class PusherPool:
p = r
if p:
self._start_pusher(p)
yield self._start_pusher(p)
@defer.inlineCallbacks
def _start_pushers(self):
@ -208,10 +209,14 @@ class PusherPool:
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
self._start_pusher(pusherdict)
# Stagger starting up the pushers so we don't completely drown the
# process on start up.
yield concurrently_execute(self._start_pusher, pushers, 10)
logger.info("Started pushers")
@defer.inlineCallbacks
def _start_pusher(self, pusherdict):
"""Start the given pusher
@ -248,7 +253,22 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
p.on_started()
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
user_id = pusherdict["user_name"]
last_stream_ordering = pusherdict["last_stream_ordering"]
if last_stream_ordering:
have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
user_id, last_stream_ordering,
)
else:
# We always want to default to starting up the pusher rather than
# risk missing push.
have_notifs = True
p.on_started(have_notifs)
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):

View file

@ -386,6 +386,36 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
"""A fast check to see if there might be something to push for the
user since the given stream ordering. May return false positives.
Useful to know whether to bother starting a pusher on start up or not.
Args:
user_id (str)
min_stream_ordering (int)
Returns:
Deferred[bool]: True if there may be push to process, False if
there definitely isn't.
"""
def _get_if_maybe_push_in_range_for_user_txn(txn):
sql = """
SELECT 1 FROM event_push_actions
WHERE user_id = ? AND stream_ordering > ?
LIMIT 1
"""
txn.execute(sql, (user_id, min_stream_ordering,))
return bool(txn.fetchone())
return self.runInteraction(
"get_if_maybe_push_in_range_for_user",
_get_if_maybe_push_in_range_for_user_txn,
)
def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.