forked from MirrorHub/synapse
Add storage methods for federated device messages
This commit is contained in:
parent
2ad72da931
commit
e020834e4f
1 changed files with 132 additions and 7 deletions
|
@ -27,28 +27,89 @@ logger = logging.getLogger(__name__)
|
|||
class DeviceInboxStore(SQLBaseStore):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_messages_to_device_inbox(self, messages_by_user_then_device):
|
||||
"""
|
||||
def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
|
||||
remote_messages_by_destination):
|
||||
"""Used to send messages from this server.
|
||||
|
||||
Args:
|
||||
messages_by_user_and_device(dict):
|
||||
sender_user_id(str): The ID of the user sending these messages.
|
||||
local_messages_by_user_and_device(dict):
|
||||
Dictionary of user_id to device_id to message.
|
||||
remote_messages_by_destination(dict):
|
||||
Dictionary of destination server_name to the EDU JSON to send.
|
||||
Returns:
|
||||
A deferred stream_id that resolves when the messages have been
|
||||
inserted.
|
||||
"""
|
||||
|
||||
def add_messages_to_device_federation_outbox(txn, now_ms, stream_id):
|
||||
sql = (
|
||||
"INSERT INTO device_federation_outbox"
|
||||
" (destination, stream_id, queued_ts, messages_json)"
|
||||
" VALUES (?,?,?,?)"
|
||||
)
|
||||
rows = []
|
||||
for destination, edu in remote_messages_by_destination.items():
|
||||
edu_json = ujson.dumps(edu)
|
||||
rows.append((destination, stream_id, now_ms, edu_json))
|
||||
|
||||
txn.executemany(sql, rows)
|
||||
|
||||
def add_messages_txn(txn, now_ms, stream_id):
|
||||
self._add_messages_to_local_device_inbox_txn(
|
||||
txn, stream_id, local_messages_by_user_then_device
|
||||
)
|
||||
add_messages_to_device_federation_outbox(now_ms, stream_id)
|
||||
|
||||
with self._device_inbox_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_now_ms()
|
||||
yield self.runInteraction(
|
||||
"add_messages_to_device_inbox",
|
||||
self._add_messages_to_device_inbox_txn,
|
||||
add_messages_txn,
|
||||
now_ms,
|
||||
stream_id,
|
||||
messages_by_user_then_device,
|
||||
)
|
||||
|
||||
defer.returnValue(self._device_inbox_id_gen.get_current_token())
|
||||
|
||||
def _add_messages_to_device_inbox_txn(self, txn, stream_id,
|
||||
messages_by_user_then_device):
|
||||
@defer.inlineCallbacks
|
||||
def add_messages_from_remote_to_device_inbox(
|
||||
self, origin, message_id, local_messages_by_user_then_device
|
||||
):
|
||||
def add_messages_txn(txn, now_ms, stream_id):
|
||||
already_inserted = self._simple_select_one_txn(
|
||||
txn, table="device_federation_inbox",
|
||||
keyvalues={"origin": origin, "message_id": message_id},
|
||||
retcols=("message_id",),
|
||||
allow_none=True,
|
||||
)
|
||||
if already_inserted is not None:
|
||||
return
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn, table="device_federation_inbox",
|
||||
values={
|
||||
"origin": origin,
|
||||
"message_id": message_id,
|
||||
"received_ts": now_ms,
|
||||
},
|
||||
)
|
||||
|
||||
self._add_messages_to_local_device_inbox_txn(
|
||||
txn, stream_id, local_messages_by_user_then_device
|
||||
)
|
||||
|
||||
with self._device_inbox_id_gen.get_next() as stream_id:
|
||||
now_ms = self.clock.time_now_ms()
|
||||
yield self.runInteraction(
|
||||
"add_messages_from_remote_to_device_inbox",
|
||||
add_messages_txn,
|
||||
now_ms,
|
||||
stream_id,
|
||||
)
|
||||
|
||||
def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
|
||||
messages_by_user_then_device):
|
||||
local_users_and_devices = set()
|
||||
for user_id, messages_by_device in messages_by_user_then_device.items():
|
||||
devices = messages_by_device.keys()
|
||||
|
@ -177,3 +238,67 @@ class DeviceInboxStore(SQLBaseStore):
|
|||
|
||||
def get_to_device_stream_token(self):
|
||||
return self._device_inbox_id_gen.get_current_token()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_new_device_messages_for_remote_destination(
|
||||
self, destination, last_stream_id, current_stream_id, limit=100
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
destination(str): The name of the remote server.
|
||||
last_stream_id(int): The last position of the device message stream
|
||||
that the server sent up to.
|
||||
current_stream_id(int): The current position of the device
|
||||
message stream.
|
||||
Returns:
|
||||
Deferred ([dict], int): List of messages for the device and where
|
||||
in the stream the messages got to.
|
||||
"""
|
||||
def get_new_messages_for_remote_destination_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, messages_json FROM device_federation_outbox"
|
||||
" WHERE destination = ?"
|
||||
" AND ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (
|
||||
destination, last_stream_id, current_stream_id, limit
|
||||
))
|
||||
messages = []
|
||||
for row in txn.fetchall():
|
||||
stream_pos = row[0]
|
||||
messages.append(ujson.loads(row[1]))
|
||||
if len(messages) < limit:
|
||||
stream_pos = current_stream_id
|
||||
return (messages, stream_pos)
|
||||
|
||||
return self.runInteraction(
|
||||
"get_new_device_messages_for_remote_destination",
|
||||
get_new_messages_for_remote_destination_txn,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_device_messages_for_remote_destination(self, destination,
|
||||
up_to_stream_id):
|
||||
"""Used to delete messages when the remote destination acknowledges
|
||||
their receipt.
|
||||
|
||||
Args:
|
||||
destination(str): The destination server_name
|
||||
up_to_stream_id(int): Where to delete messages up to.
|
||||
Returns:
|
||||
A deferred that resolves when the messages have been deleted.
|
||||
"""
|
||||
def delete_messages_for_remote_destination_txn(txn):
|
||||
sql = (
|
||||
"DELETE FROM device_federation_outbox"
|
||||
" WHERE destination = ? AND"
|
||||
" AND stream_id <= ?"
|
||||
)
|
||||
txn.execute(sql, (destination, up_to_stream_id))
|
||||
|
||||
return self.runInteraction(
|
||||
"delete_device_messages_for_remote_destination",
|
||||
delete_messages_for_remote_destination_txn
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue