diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index d9f91ccc4e..61da0e89e6 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -42,7 +42,15 @@ class DeviceInboxStore(SQLBaseStore): inserted. """ - def add_messages_to_device_federation_outbox(txn, now_ms, stream_id): + def add_messages_txn(txn, now_ms, stream_id): + # Add the local messages directly to the local inbox. + self._add_messages_to_local_device_inbox_txn( + txn, stream_id, local_messages_by_user_then_device + ) + + # Add the remote messages to the federation outbox. + # We'll send them to a remote server when we next send a + # federation transaction to that destination. sql = ( "INSERT INTO device_federation_outbox" " (destination, stream_id, queued_ts, messages_json)" @@ -52,15 +60,8 @@ class DeviceInboxStore(SQLBaseStore): for destination, edu in remote_messages_by_destination.items(): edu_json = ujson.dumps(edu) rows.append((destination, stream_id, now_ms, edu_json)) - txn.executemany(sql, rows) - def add_messages_txn(txn, now_ms, stream_id): - self._add_messages_to_local_device_inbox_txn( - txn, stream_id, local_messages_by_user_then_device - ) - add_messages_to_device_federation_outbox(txn, now_ms, stream_id) - with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() yield self.runInteraction( @@ -77,6 +78,9 @@ class DeviceInboxStore(SQLBaseStore): self, origin, message_id, local_messages_by_user_then_device ): def add_messages_txn(txn, now_ms, stream_id): + # Check if we've already inserted a matching message_id for that + # origin. This can happen if the origin doesn't receive our + # acknowledgement from the first time we received the message. already_inserted = self._simple_select_one_txn( txn, table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, @@ -86,6 +90,8 @@ class DeviceInboxStore(SQLBaseStore): if already_inserted is not None: return + # Add an entry for this message_id so that we know we've processed + # it. self._simple_insert_txn( txn, table="device_federation_inbox", values={ @@ -95,6 +101,8 @@ class DeviceInboxStore(SQLBaseStore): }, ) + # Add the messages to the approriate local device inboxes so that + # they'll be sent to the devices when they next sync. self._add_messages_to_local_device_inbox_txn( txn, stream_id, local_messages_by_user_then_device )