Remove pdu_failures from transactions

The field is never read from, and all the opportunities given to populate it are not utilized. It should be very safe to remove this.
This commit is contained in:
Travis Ralston 2018-07-30 16:24:02 -06:00
parent e9b2d047f6
commit e908b86832
7 changed files with 8 additions and 97 deletions

1
changelog.d/3628.misc Normal file
View file

@ -0,0 +1 @@
Remove unused field "pdu_failures" from transactions.

View file

@ -207,10 +207,6 @@ class FederationServer(FederationBase):
edu.content edu.content
) )
pdu_failures = getattr(transaction, "pdu_failures", [])
for fail in pdu_failures:
logger.info("Got failure %r", fail)
response = { response = {
"pdus": pdu_results, "pdus": pdu_results,
} }

View file

@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
self.edus = SortedDict() # stream position -> Edu self.edus = SortedDict() # stream position -> Edu
self.failures = SortedDict() # stream position -> (destination, Failure)
self.device_messages = SortedDict() # stream position -> destination self.device_messages = SortedDict() # stream position -> destination
self.pos = 1 self.pos = 1
@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [ for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "failures", "device_messages", "pos_time", "edus", "device_messages", "pos_time",
]: ]:
register(queue_name, getattr(self, queue_name)) register(queue_name, getattr(self, queue_name))
@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]: for key in keys[:i]:
del self.edus[key] del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map # Delete things out of device map
keys = self.device_messages.keys() keys = self.device_messages.keys()
i = self.device_messages.bisect_left(position_to_delete) i = self.device_messages.bisect_left(position_to_delete)
@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data() self.notifier.on_new_replication_data()
def send_failure(self, failure, destination):
"""As per TransactionQueue"""
pos = self._next_pos()
self.failures[pos] = (destination, str(failure))
self.notifier.on_new_replication_data()
def send_device_messages(self, destination): def send_device_messages(self, destination):
"""As per TransactionQueue""" """As per TransactionQueue"""
pos = self._next_pos() pos = self._next_pos()
@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
for (pos, edu) in edus: for (pos, edu) in edus:
rows.append((pos, EduRow(edu))) rows.append((pos, EduRow(edu)))
# Fetch changed failures
i = self.failures.bisect_right(from_token)
j = self.failures.bisect_right(to_token) + 1
failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
destination=destination,
failure=failure,
)))
# Fetch changed device messages # Fetch changed device messages
i = self.device_messages.bisect_right(from_token) i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1 j = self.device_messages.bisect_right(to_token) + 1
@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
buff.edus.setdefault(self.edu.destination, []).append(self.edu) buff.edus.setdefault(self.edu.destination, []).append(self.edu)
class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
"destination", # str
"failure",
))):
"""Streams failures to a remote server. Failures are issued when there was
something wrong with a transaction the remote sent us, e.g. it included
an event that was invalid.
"""
TypeId = "f"
@staticmethod
def from_data(data):
return FailureRow(
destination=data["destination"],
failure=data["failure"],
)
def to_data(self):
return {
"destination": self.destination,
"failure": self.failure,
}
def add_to_buffer(self, buff):
buff.failures.setdefault(self.destination, []).append(self.failure)
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str "destination", # str
))): ))):
@ -471,7 +417,6 @@ TypeToRow = {
PresenceRow, PresenceRow,
KeyedEduRow, KeyedEduRow,
EduRow, EduRow,
FailureRow,
DeviceRow, DeviceRow,
) )
} }
@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState) "presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu } "keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu] "edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
"device_destinations", # set of destinations "device_destinations", # set of destinations
)) ))
@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
presence=[], presence=[],
keyed_edus={}, keyed_edus={},
edus={}, edus={},
failures={},
device_destinations=set(), device_destinations=set(),
) )
@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
edu.destination, edu.edu_type, edu.content, key=None, edu.destination, edu.edu_type, edu.content, key=None,
) )
for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
transaction_queue.send_failure(destination, failure)
for destination in buff.device_destinations: for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination) transaction_queue.send_device_messages(destination)

View file

@ -116,9 +116,6 @@ class TransactionQueue(object):
), ),
) )
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
# destination -> stream_id of last successfully sent to-device message. # destination -> stream_id of last successfully sent to-device message.
# NB: may be a long or an int. # NB: may be a long or an int.
self.last_device_stream_id_by_dest = {} self.last_device_stream_id_by_dest = {}
@ -382,19 +379,6 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination)
def send_failure(self, failure, destination):
if destination == self.server_name or destination == "localhost":
return
if not self.can_send_to(destination):
return
self.pending_failures_by_dest.setdefault(
destination, []
).append(failure)
self._attempt_new_transaction(destination)
def send_device_messages(self, destination): def send_device_messages(self, destination):
if destination == self.server_name or destination == "localhost": if destination == self.server_name or destination == "localhost":
return return
@ -469,7 +453,6 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend( pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values() self.pending_edus_keyed_by_dest.pop(destination, {}).values()
@ -497,7 +480,7 @@ class TransactionQueue(object):
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus)) destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures: if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", destination) logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = ( self.last_device_stream_id_by_dest[destination] = (
device_stream_id device_stream_id
@ -507,7 +490,7 @@ class TransactionQueue(object):
# END CRITICAL SECTION # END CRITICAL SECTION
success = yield self._send_new_transaction( success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures, destination, pending_pdus, pending_edus,
) )
if success: if success:
sent_transactions_counter.inc() sent_transactions_counter.inc()
@ -584,14 +567,12 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction") @measure_func("_send_new_transaction")
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus, def _send_new_transaction(self, destination, pending_pdus, pending_edus):
pending_failures):
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1]) pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus] pdus = [x[0] for x in pending_pdus]
edus = pending_edus edus = pending_edus
failures = [x.get_dict() for x in pending_failures]
success = True success = True
@ -601,11 +582,10 @@ class TransactionQueue(object):
logger.debug( logger.debug(
"TX [%s] {%s} Attempting new transaction" "TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)", " (pdus: %d, edus: %d)",
destination, txn_id, destination, txn_id,
len(pdus), len(pdus),
len(edus), len(edus),
len(failures)
) )
logger.debug("TX [%s] Persisting transaction...", destination) logger.debug("TX [%s] Persisting transaction...", destination)
@ -617,7 +597,6 @@ class TransactionQueue(object):
destination=destination, destination=destination,
pdus=pdus, pdus=pdus,
edus=edus, edus=edus,
pdu_failures=failures,
) )
self._next_txn_id += 1 self._next_txn_id += 1
@ -627,12 +606,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Persisted transaction", destination) logger.debug("TX [%s] Persisted transaction", destination)
logger.info( logger.info(
"TX [%s] {%s} Sending transaction [%s]," "TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)", " (PDUs: %d, EDUs: %d)",
destination, txn_id, destination, txn_id,
transaction.transaction_id, transaction.transaction_id,
len(pdus), len(pdus),
len(edus), len(edus),
len(failures),
) )
# Actually send the transaction # Actually send the transaction

View file

@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
) )
logger.info( logger.info(
"Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)", "Received txn %s from %s. (PDUs: %d, EDUs: %d)",
transaction_id, origin, transaction_id, origin,
len(transaction_data.get("pdus", [])), len(transaction_data.get("pdus", [])),
len(transaction_data.get("edus", [])), len(transaction_data.get("edus", [])),
len(transaction_data.get("failures", [])),
) )
# We should ideally be getting this from the security layer. # We should ideally be getting this from the security layer.

View file

@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
"previous_ids", "previous_ids",
"pdus", "pdus",
"edus", "edus",
"pdu_failures",
] ]
internal_keys = [ internal_keys = [

View file

@ -44,7 +44,6 @@ def _expect_edu(destination, edu_type, content, origin="test"):
"content": content, "content": content,
} }
], ],
"pdu_failures": [],
} }