mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 12:33:52 +01:00
Comment the add_messages storage functions
This commit is contained in:
parent
74cbfdc7de
commit
7d893beebe
1 changed files with 16 additions and 8 deletions
|
@ -42,7 +42,15 @@ class DeviceInboxStore(SQLBaseStore):
|
||||||
inserted.
|
inserted.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def add_messages_to_device_federation_outbox(txn, now_ms, stream_id):
|
def add_messages_txn(txn, now_ms, stream_id):
|
||||||
|
# Add the local messages directly to the local inbox.
|
||||||
|
self._add_messages_to_local_device_inbox_txn(
|
||||||
|
txn, stream_id, local_messages_by_user_then_device
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add the remote messages to the federation outbox.
|
||||||
|
# We'll send them to a remote server when we next send a
|
||||||
|
# federation transaction to that destination.
|
||||||
sql = (
|
sql = (
|
||||||
"INSERT INTO device_federation_outbox"
|
"INSERT INTO device_federation_outbox"
|
||||||
" (destination, stream_id, queued_ts, messages_json)"
|
" (destination, stream_id, queued_ts, messages_json)"
|
||||||
|
@ -52,15 +60,8 @@ class DeviceInboxStore(SQLBaseStore):
|
||||||
for destination, edu in remote_messages_by_destination.items():
|
for destination, edu in remote_messages_by_destination.items():
|
||||||
edu_json = ujson.dumps(edu)
|
edu_json = ujson.dumps(edu)
|
||||||
rows.append((destination, stream_id, now_ms, edu_json))
|
rows.append((destination, stream_id, now_ms, edu_json))
|
||||||
|
|
||||||
txn.executemany(sql, rows)
|
txn.executemany(sql, rows)
|
||||||
|
|
||||||
def add_messages_txn(txn, now_ms, stream_id):
|
|
||||||
self._add_messages_to_local_device_inbox_txn(
|
|
||||||
txn, stream_id, local_messages_by_user_then_device
|
|
||||||
)
|
|
||||||
add_messages_to_device_federation_outbox(txn, now_ms, stream_id)
|
|
||||||
|
|
||||||
with self._device_inbox_id_gen.get_next() as stream_id:
|
with self._device_inbox_id_gen.get_next() as stream_id:
|
||||||
now_ms = self.clock.time_msec()
|
now_ms = self.clock.time_msec()
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
|
@ -77,6 +78,9 @@ class DeviceInboxStore(SQLBaseStore):
|
||||||
self, origin, message_id, local_messages_by_user_then_device
|
self, origin, message_id, local_messages_by_user_then_device
|
||||||
):
|
):
|
||||||
def add_messages_txn(txn, now_ms, stream_id):
|
def add_messages_txn(txn, now_ms, stream_id):
|
||||||
|
# Check if we've already inserted a matching message_id for that
|
||||||
|
# origin. This can happen if the origin doesn't receive our
|
||||||
|
# acknowledgement from the first time we received the message.
|
||||||
already_inserted = self._simple_select_one_txn(
|
already_inserted = self._simple_select_one_txn(
|
||||||
txn, table="device_federation_inbox",
|
txn, table="device_federation_inbox",
|
||||||
keyvalues={"origin": origin, "message_id": message_id},
|
keyvalues={"origin": origin, "message_id": message_id},
|
||||||
|
@ -86,6 +90,8 @@ class DeviceInboxStore(SQLBaseStore):
|
||||||
if already_inserted is not None:
|
if already_inserted is not None:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Add an entry for this message_id so that we know we've processed
|
||||||
|
# it.
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
txn, table="device_federation_inbox",
|
txn, table="device_federation_inbox",
|
||||||
values={
|
values={
|
||||||
|
@ -95,6 +101,8 @@ class DeviceInboxStore(SQLBaseStore):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Add the messages to the approriate local device inboxes so that
|
||||||
|
# they'll be sent to the devices when they next sync.
|
||||||
self._add_messages_to_local_device_inbox_txn(
|
self._add_messages_to_local_device_inbox_txn(
|
||||||
txn, stream_id, local_messages_by_user_then_device
|
txn, stream_id, local_messages_by_user_then_device
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue