Avoid rebuilding Edu objects in worker mode (#4770)

In worker mode, on the federation sender, when we receive an edu for sending
over the replication socket, it is parsed into an Edu object. There is no point
extracting the contents of it so that we can then immediately build another Edu.
This commit is contained in:
Richard van der Hoff 2019-03-04 12:57:44 +00:00 committed by GitHub
parent 2c3548d9d8
commit 856c83f5f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 37 additions and 19 deletions

1
changelog.d/4770.misc Normal file
View file

@ -0,0 +1 @@
Optimise EDU transmission for the federation_sender worker.

View file

@ -159,8 +159,12 @@ class FederationRemoteSendQueue(object):
# stream. # stream.
pass pass
def send_edu(self, destination, edu_type, content, key=None): def build_and_send_edu(self, destination, edu_type, content, key=None):
"""As per TransactionQueue""" """As per TransactionQueue"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
pos = self._next_pos() pos = self._next_pos()
edu = Edu( edu = Edu(
@ -465,15 +469,11 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_map in iteritems(buff.keyed_edus): for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items(): for key, edu in edu_map.items():
transaction_queue.send_edu( transaction_queue.send_edu(edu, key)
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in iteritems(buff.edus): for destination, edu_list in iteritems(buff.edus):
for edu in edu_list: for edu in edu_list:
transaction_queue.send_edu( transaction_queue.send_edu(edu, None)
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination in buff.device_destinations: for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination) transaction_queue.send_device_messages(destination)

View file

@ -361,7 +361,19 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination)
def send_edu(self, destination, edu_type, content, key=None): def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending
Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
edu = Edu( edu = Edu(
origin=self.server_name, origin=self.server_name,
destination=destination, destination=destination,
@ -369,18 +381,23 @@ class TransactionQueue(object):
content=content, content=content,
) )
if destination == self.server_name: self.send_edu(edu, key)
logger.info("Not sending EDU to ourselves")
return
def send_edu(self, edu, key):
"""Queue an EDU for sending
Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
if key: if key:
self.pending_edus_keyed_by_dest.setdefault( self.pending_edus_keyed_by_dest.setdefault(
destination, {} edu.destination, {}
)[(edu.edu_type, key)] = edu )[(edu.edu_type, key)] = edu
else: else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu) self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
self._attempt_new_transaction(destination) self._attempt_new_transaction(edu.destination)
def send_device_messages(self, destination): def send_device_messages(self, destination):
if destination == self.server_name: if destination == self.server_name:

View file

@ -816,7 +816,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user): if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user) yield self.invite_presence(observed_user, observer_user)
else: else:
yield self.federation.send_edu( yield self.federation.build_and_send_edu(
destination=observed_user.domain, destination=observed_user.domain,
edu_type="m.presence_invite", edu_type="m.presence_invite",
content={ content={
@ -836,7 +836,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user): if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user) yield self.accept_presence(observed_user, observer_user)
else: else:
self.federation.send_edu( self.federation.build_and_send_edu(
destination=observer_user.domain, destination=observer_user.domain,
edu_type="m.presence_accept", edu_type="m.presence_accept",
content={ content={
@ -848,7 +848,7 @@ class PresenceHandler(object):
state_dict = yield self.get_state(observed_user, as_event=False) state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.send_edu( self.federation.build_and_send_edu(
destination=observer_user.domain, destination=observer_user.domain,
edu_type="m.presence", edu_type="m.presence",
content={ content={

View file

@ -148,7 +148,7 @@ class ReceiptsHandler(BaseHandler):
logger.debug("Sending receipt to: %r", remotedomains) logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains: for domain in remotedomains:
self.federation.send_edu( self.federation.build_and_send_edu(
destination=domain, destination=domain,
edu_type="m.receipt", edu_type="m.receipt",
content={ content={

View file

@ -231,7 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users): for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name: if domain != self.server_name:
logger.debug("sending typing update to %s", domain) logger.debug("sending typing update to %s", domain)
self.federation.send_edu( self.federation.build_and_send_edu(
destination=domain, destination=domain,
edu_type="m.typing", edu_type="m.typing",
content={ content={