diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 145c01f3a..477e16e0f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.federation.units import Edu from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -277,69 +275,7 @@ class FederationSenderHandler(object): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - # The federation stream containis a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - presence_to_send = {} - keyed_edus = {} - edus = {} - failures = {} - device_destinations = set() - - # Parse the rows in the stream - for row in rows: - typ = row.type - content = row.data - - if typ == send_queue.PRESENCE_TYPE: - destination = content["destination"] - state = UserPresenceState.from_dict(content["state"]) - - presence_to_send.setdefault(destination, []).append(state) - elif typ == send_queue.KEYED_EDU_TYPE: - key = content["key"] - edu = Edu(**content["edu"]) - - keyed_edus.setdefault( - edu.destination, {} - )[(edu.destination, tuple(key))] = edu - elif typ == send_queue.EDU_TYPE: - edu = Edu(**content) - - edus.setdefault(edu.destination, []).append(edu) - elif typ == send_queue.FAILURE_TYPE: - destination = content["destination"] - failure = content["failure"] - - failures.setdefault(destination, []).append(failure) - elif typ == send_queue.DEVICE_MESSAGE_TYPE: - device_destinations.add(content["destination"]) - else: - raise Exception("Unrecognised federation type: %r", typ) - - # We've finished collecting, send everything off - for destination, states in presence_to_send.items(): - self.federation_sender.send_presence(destination, states) - - for destination, edu_map in keyed_edus.items(): - for key, edu in edu_map.items(): - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) - - for destination, edu_list in edus.items(): - for edu in edu_list: - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) - - for destination, failure_list in failures.items(): - for failure in failure_list: - self.federation_sender.send_failure(destination, failure) - - for destination in device_destinations: - self.federation_sender.send_device_messages(destination) - + send_queue.process_rows_for_federation(self.federation_sender, rows) preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 78c852ed6..748548bbe 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,22 +31,21 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict +from collections import namedtuple + +import logging + +logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -PRESENCE_TYPE = "p" -KEYED_EDU_TYPE = "k" -EDU_TYPE = "e" -FAILURE_TYPE = "f" -DEVICE_MESSAGE_TYPE = "d" - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -239,6 +238,8 @@ class FederationRemoteSendQueue(object): if from_token > self.pos: from_token = -1 + # list of tuple(int, BaseFederationRow), where the first is the position + # of the federation stream. rows = [] # There should be only one reader, so lets delete everything its @@ -257,10 +258,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, { - "destination": dest, - "state": self.presence_map[user_id].as_dict(), - })) + rows.append((key, PresenceRow( + destination=dest, + state=self.presence_map[user_id], + ))) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -269,12 +270,10 @@ class FederationRemoteSendQueue(object): keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: - rows.append( - (pos, KEYED_EDU_TYPE, { - "key": edu_key, - "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - }) - ) + rows.append((pos, KeyedEduRow( + key=edu_key, + edu=self.keyed_edu[(destination, edu_key)], + ))) # Fetch changed edus keys = self.edus.keys() @@ -283,7 +282,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, edu.get_internal_dict())) + rows.append((pos, EduRow(edu))) # Fetch changed failures keys = self.failures.keys() @@ -292,10 +291,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, { - "destination": destination, - "failure": failure, - })) + rows.append((pos, FailureRow( + destination=destination, + failure=failure, + ))) # Fetch changed device messages keys = self.device_messages.keys() @@ -304,11 +303,229 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, { - "destination": destination, - })) + rows.append((pos, DeviceRow( + destination=destination, + ))) # Sort rows based on pos rows.sort() - return rows + return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + + +class BaseFederationRow(object): + """Base class for rows to be sent in the federation stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + + @staticmethod + def from_data(data): + """Parse the data from the federation stream into a row. + + Args: + data: The value of ``data`` from FederationStreamRow.data, type + depends on the type of stream + """ + raise NotImplementedError() + + def to_data(self): + """Serialize this row to be sent over the federation stream. + + Returns: + The value to be sent in FederationStreamRow.data. The type depends + on the type of stream. + """ + raise NotImplementedError() + + def add_to_buffer(self, buff): + """Add this row to the appropriate field in the buffer ready for this + to be sent over federation. + + We use a buffer so that we can batch up events that have come in at + the same time and send them all at once. + + Args: + buff (BufferedToSend) + """ + raise NotImplementedError() + + +class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( + "destination", # str + "state", # UserPresenceState +))): + TypeId = "p" + + @staticmethod + def from_data(data): + return PresenceRow( + destination=data["destination"], + state=UserPresenceState.from_dict(data["state"]) + ) + + def to_data(self): + return { + "destination": self.destination, + "state": self.state.as_dict() + } + + def add_to_buffer(self, buff): + buff.presence.setdefault(self.destination, []).append(self.state) + + +class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( + "key", # tuple(str) - the edu key passed to send_edu + "edu", # Edu +))): + TypeId = "k" + + @staticmethod + def from_data(data): + return KeyedEduRow( + key=tuple(data["key"]), + edu=Edu(**data["edu"]), + ) + + def to_data(self): + return { + "key": self.key, + "edu": self.edu.get_internal_dict(), + } + + def add_to_buffer(self, buff): + buff.keyed_edus.setdefault( + self.edu.destination, {} + )[self.key] = self.edu + + +class EduRow(BaseFederationRow, namedtuple("EduRow", ( + "edu", # Edu +))): + TypeId = "e" + + @staticmethod + def from_data(data): + return EduRow(Edu(**data)) + + def to_data(self): + return self.edu.get_internal_dict() + + def add_to_buffer(self, buff): + buff.edus.setdefault(self.edu.destination, []).append(self.edu) + + +class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( + "destination", # str + "failure", +))): + TypeId = "f" + + @staticmethod + def from_data(data): + return FailureRow( + destination=data["destination"], + failure=data["failure"], + ) + + def to_data(self): + return { + "destination": self.destination, + "failure": self.failure, + } + + def add_to_buffer(self, buff): + buff.failures.setdefault(self.destination, []).append(self.failure) + + +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( + "destination", # str +))): + TypeId = "d" + + @staticmethod + def from_data(data): + return DeviceRow(destination=data["destination"]) + + def to_data(self): + return {"destination": self.destination} + + def add_to_buffer(self, buff): + buff.device_destinations.add(self.destination) + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + PresenceRow, + KeyedEduRow, + EduRow, + FailureRow, + DeviceRow, + ) +} + + +ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( + "presence", # dict of destination -> [UserPresenceState] + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "failures", # dict of destination -> [failures] + "device_destinations", # set of destinations +)) + + +def process_rows_for_federation(transaction_queue, rows): + """Parse a list of rows from the federation stream and put them in the + transaction queue ready for sending to the relevant homeservers. + + Args: + transaction_queue (TransactionQueue) + rows (list(synapse.replication.tcp.streams.FederationStreamRow)) + """ + + # The federation stream contains a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + + buff = ParsedFederationStreamData( + presence={}, + keyed_edus={}, + edus={}, + failures={}, + device_destinations=set(), + ) + + # Parse the rows in the stream and add to the buffer + for row in rows: + if row.type not in TypeToRow: + logger.error("Unrecognized federation row type %r", row.type) + continue + + RowType = TypeToRow[row.type] + parsed_row = RowType.from_data(row.data) + parsed_row.add_to_buffer(buff) + + for destination, states in buff.presence.iteritems(): + transaction_queue.send_presence(destination, states) + + for destination, edu_map in buff.keyed_edus.iteritems(): + for key, edu in edu_map.items(): + transaction_queue.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in buff.edus.iteritems(): + for edu in edu_list: + transaction_queue.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in buff.failures.iteritems(): + for failure in failure_list: + transaction_queue.send_failure(destination, failure) + + for destination in buff.device_destinations: + transaction_queue.send_device_messages(destination) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 967b459e0..369d5f242 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -98,8 +98,8 @@ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( "entity", # str )) FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str - "data", # dict + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow )) TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( "user_id", # str