From 11880103b13146aa8e700827f36c81a26fb8e09e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:11:17 +0100 Subject: [PATCH] Make federation send queue take the current position --- synapse/federation/send_queue.py | 40 ++++++++++++++++++++------------ synapse/replication/resource.py | 2 +- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index bbb019522..4bde66fbf 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit, federation_ack=None): - """ + def federation_ack(self, token): + self._clear_queue_before_pos(token) + + def get_replication_rows(self, from_token, to_token, limit, federation_ack=None): + """Get rows to be sent over federation between the two tokens + Args: - token (int) + from_token (int) + to_token(int) limit (int) federation_ack (int): Optional. The position where the worker is explicitly acknowledged it has handled. Allows us to drop @@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object): # TODO: Handle limit. # To handle restarts where we wrap around - if token > self.pos: - token = -1 + if from_token > self.pos: + from_token = -1 rows = [] @@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object): # Fetch changed presence keys = self.presence_changed.keys() - i = keys.bisect_right(token) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 dest_user_ids = set( (pos, dest_user_id) - for pos in keys[i:] + for pos in keys[i:j] for dest_user_id in self.presence_changed[pos] ) @@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object): # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(token) - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: rows.append( @@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object): # Fetch changed edus keys = self.edus.keys() - i = keys.bisect_right(token) - edus = set((k, self.edus[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() - i = keys.bisect_right(token) - failures = set((k, self.failures[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FAILURE_TYPE, ujson.dumps({ @@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object): # Fetch changed device messages keys = self.device_messages.keys() - i = keys.bisect_right(token) - device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 2d3ec2eca..abd3fe766 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -489,7 +489,7 @@ class ReplicationResource(Resource): if federation is not None and federation != current_position: federation_rows = self.federation_sender.get_replication_rows( - federation, limit, federation_ack=federation_ack, + federation, current_position, limit, federation_ack=federation_ack, ) upto_token = _position_from_rows(federation_rows, current_position) writer.write_header_and_rows("federation", federation_rows, (