mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 19:43:50 +01:00
Clean out invalid destinations from outbox (#17242)
We started ensuring we only insert valid destinations: https://github.com/element-hq/synapse/pull/17240
This commit is contained in:
parent
8bd9ff0783
commit
225f378ffa
3 changed files with 92 additions and 0 deletions
1
changelog.d/17242.misc
Normal file
1
changelog.d/17242.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Clean out invalid destinations from `device_federation_outbox` table.
|
|
@ -58,6 +58,7 @@ from synapse.types import JsonDict
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
from synapse.util.stringutils import parse_and_validate_server_name
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
@ -964,6 +965,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||||
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
|
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
|
||||||
|
CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -989,6 +991,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||||
self._remove_dead_devices_from_device_inbox,
|
self._remove_dead_devices_from_device_inbox,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.db_pool.updates.register_background_update_handler(
|
||||||
|
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
|
||||||
|
self._cleanup_device_federation_outbox,
|
||||||
|
)
|
||||||
|
|
||||||
async def _background_drop_index_device_inbox(
|
async def _background_drop_index_device_inbox(
|
||||||
self, progress: JsonDict, batch_size: int
|
self, progress: JsonDict, batch_size: int
|
||||||
) -> int:
|
) -> int:
|
||||||
|
@ -1080,6 +1087,75 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||||
|
|
||||||
return batch_size
|
return batch_size
|
||||||
|
|
||||||
|
async def _cleanup_device_federation_outbox(
|
||||||
|
self,
|
||||||
|
progress: JsonDict,
|
||||||
|
batch_size: int,
|
||||||
|
) -> int:
|
||||||
|
def _cleanup_device_federation_outbox_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> bool:
|
||||||
|
if "max_stream_id" in progress:
|
||||||
|
max_stream_id = progress["max_stream_id"]
|
||||||
|
else:
|
||||||
|
txn.execute("SELECT max(stream_id) FROM device_federation_outbox")
|
||||||
|
res = cast(Tuple[Optional[int]], txn.fetchone())
|
||||||
|
if res[0] is None:
|
||||||
|
# this can only happen if the `device_inbox` table is empty, in which
|
||||||
|
# case we have no work to do.
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
max_stream_id = res[0]
|
||||||
|
|
||||||
|
start = progress.get("stream_id", 0)
|
||||||
|
stop = start + batch_size
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT destination FROM device_federation_outbox
|
||||||
|
WHERE ? < stream_id AND stream_id <= ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (start, stop))
|
||||||
|
|
||||||
|
destinations = {d for d, in txn}
|
||||||
|
to_remove = set()
|
||||||
|
for d in destinations:
|
||||||
|
try:
|
||||||
|
parse_and_validate_server_name(d)
|
||||||
|
except ValueError:
|
||||||
|
to_remove.add(d)
|
||||||
|
|
||||||
|
self.db_pool.simple_delete_many_txn(
|
||||||
|
txn,
|
||||||
|
table="device_federation_outbox",
|
||||||
|
column="destination",
|
||||||
|
values=to_remove,
|
||||||
|
keyvalues={},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
|
txn,
|
||||||
|
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
|
||||||
|
{
|
||||||
|
"stream_id": stop,
|
||||||
|
"max_stream_id": max_stream_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return stop >= max_stream_id
|
||||||
|
|
||||||
|
finished = await self.db_pool.runInteraction(
|
||||||
|
"_cleanup_device_federation_outbox",
|
||||||
|
_cleanup_device_federation_outbox_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if finished:
|
||||||
|
await self.db_pool.updates._end_background_update(
|
||||||
|
self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
|
||||||
|
)
|
||||||
|
|
||||||
|
return batch_size
|
||||||
|
|
||||||
|
|
||||||
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
|
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
--
|
||||||
|
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
--
|
||||||
|
-- Copyright (C) 2024 New Vector, Ltd
|
||||||
|
--
|
||||||
|
-- This program is free software: you can redistribute it and/or modify
|
||||||
|
-- it under the terms of the GNU Affero General Public License as
|
||||||
|
-- published by the Free Software Foundation, either version 3 of the
|
||||||
|
-- License, or (at your option) any later version.
|
||||||
|
--
|
||||||
|
-- See the GNU Affero General Public License for more details:
|
||||||
|
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
|
||||||
|
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||||
|
(8504, 'cleanup_device_federation_outbox', '{}');
|
Loading…
Reference in a new issue