Merge pull request #2875 from matrix-org/erikj/push_actions_worker

Calculate push actions on worker
This commit is contained in:
Erik Johnston 2018-03-01 11:20:07 +00:00 committed by GitHub
commit 182ff17c83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 29 deletions

View file

@ -27,10 +27,14 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
@ -48,6 +52,10 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore( class EventCreatorSlavedStore(
SlavedAccountDataStore,
SlavedPusherStore,
SlavedReceiptsStore,
SlavedPushRuleStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedClientIpStore, SlavedClientIpStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,

View file

@ -553,24 +553,21 @@ class EventCreationHandler(object):
event, event,
context, context,
ratelimit=True, ratelimit=True,
extra_users=[] extra_users=[],
): ):
# We now need to go and hit out to wherever we need to hit out to. """Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
# If we're a worker we need to hit out to the master. If called from a worker will hit out to the master process for final
if self.config.worker_app: processing.
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
)
return
if ratelimit: Args:
yield self.base_handler.ratelimit(requester) requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(str)): Any extra users to notify about event
"""
try: try:
yield self.auth.check_from_context(event, context) yield self.auth.check_from_context(event, context)
@ -586,6 +583,57 @@ class EventCreationHandler(object):
logger.exception("Failed to encode content: %r", event.content) logger.exception("Failed to encode content: %r", event.content)
raise raise
yield self.action_generator.handle_push_actions_for_event(
event, context
)
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
)
return
yield self.persist_and_notify_client_event(
requester,
event,
context,
ratelimit=ratelimit,
extra_users=extra_users,
)
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
@defer.inlineCallbacks
def persist_and_notify_client_event(
self,
requester,
event,
context,
ratelimit=True,
extra_users=[],
):
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
This should only be run on master.
"""
assert not self.config.worker_app
if ratelimit:
yield self.base_handler.ratelimit(requester)
yield self.base_handler.maybe_kick_guest_users(event, context) yield self.base_handler.maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias: if event.type == EventTypes.CanonicalAlias:
@ -679,20 +727,10 @@ class EventCreationHandler(object):
"Changing the room create event is forbidden", "Changing the room create event is forbidden",
) )
yield self.action_generator.handle_push_actions_for_event( (event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context event, context=context
) )
try:
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
# and don't need to wait for it. # and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)( preserve_fn(self.pusher_pool.on_new_notifications)(

View file

@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context): def send_event_to_master(client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master """Send event to be handled on the master
Args: Args:
@ -39,6 +40,8 @@ def send_event_to_master(client, host, port, requester, event, context):
requester (Requester) requester (Requester)
event (FrozenEvent) event (FrozenEvent)
context (EventContext) context (EventContext)
ratelimit (bool)
extra_users (list(str)): Any extra users to notify about event
""" """
uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,) uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
@ -48,6 +51,8 @@ def send_event_to_master(client, host, port, requester, event, context):
"rejected_reason": event.rejected_reason, "rejected_reason": event.rejected_reason,
"context": context.serialize(event), "context": context.serialize(event),
"requester": requester.serialize(), "requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": extra_users,
} }
try: try:
@ -74,6 +79,8 @@ class ReplicationSendEventRestServlet(RestServlet):
"rejected_reason": .., // The event.rejected_reason field "rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. }, "context": { .. serialized event context .. },
"requester": { .. serialized requester .. }, "requester": { .. serialized requester .. },
"ratelimit": true,
"extra_users": [],
} }
""" """
PATTERNS = [re.compile("^/_synapse/replication/send_event$")] PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
@ -98,6 +105,9 @@ class ReplicationSendEventRestServlet(RestServlet):
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"]) context = yield EventContext.deserialize(self.store, content["context"])
ratelimit = content["ratelimit"]
extra_users = content["extra_users"]
if requester.user: if requester.user:
request.authenticated_entity = requester.user.to_string() request.authenticated_entity = requester.user.to_string()
@ -106,8 +116,10 @@ class ReplicationSendEventRestServlet(RestServlet):
event.event_id, event.room_id, event.event_id, event.room_id,
) )
yield self.event_creation_handler.handle_new_client_event( yield self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, requester, event, context,
ratelimit=ratelimit,
extra_users=extra_users,
) )
defer.returnValue((200, {})) defer.returnValue((200, {}))