forked from MirrorHub/synapse
Include the room ID in more purge room log lines. (#15222)
This commit is contained in:
parent
f4fc83ac75
commit
88efc75bab
5 changed files with 23 additions and 15 deletions
1
changelog.d/15222.misc
Normal file
1
changelog.d/15222.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Improve log lines when purging rooms.
|
|
@ -683,7 +683,7 @@ class PaginationHandler:
|
||||||
|
|
||||||
await self._storage_controllers.purge_events.purge_room(room_id)
|
await self._storage_controllers.purge_events.purge_room(room_id)
|
||||||
|
|
||||||
logger.info("complete")
|
logger.info("purge complete for room_id %s", room_id)
|
||||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
|
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
|
||||||
except Exception:
|
except Exception:
|
||||||
f = Failure()
|
f = Failure()
|
||||||
|
|
|
@ -16,6 +16,7 @@ import itertools
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Set
|
from typing import TYPE_CHECKING, Set
|
||||||
|
|
||||||
|
from synapse.logging.context import nested_logging_context
|
||||||
from synapse.storage.databases import Databases
|
from synapse.storage.databases import Databases
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -33,8 +34,9 @@ class PurgeEventsStorageController:
|
||||||
async def purge_room(self, room_id: str) -> None:
|
async def purge_room(self, room_id: str) -> None:
|
||||||
"""Deletes all record of a room"""
|
"""Deletes all record of a room"""
|
||||||
|
|
||||||
state_groups_to_delete = await self.stores.main.purge_room(room_id)
|
with nested_logging_context(room_id):
|
||||||
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
|
state_groups_to_delete = await self.stores.main.purge_room(room_id)
|
||||||
|
await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
|
||||||
|
|
||||||
async def purge_history(
|
async def purge_history(
|
||||||
self, room_id: str, token: str, delete_local_events: bool
|
self, room_id: str, token: str, delete_local_events: bool
|
||||||
|
@ -51,15 +53,17 @@ class PurgeEventsStorageController:
|
||||||
(instead of just marking them as outliers and deleting their
|
(instead of just marking them as outliers and deleting their
|
||||||
state groups).
|
state groups).
|
||||||
"""
|
"""
|
||||||
state_groups = await self.stores.main.purge_history(
|
with nested_logging_context(room_id):
|
||||||
room_id, token, delete_local_events
|
state_groups = await self.stores.main.purge_history(
|
||||||
)
|
room_id, token, delete_local_events
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("[purge] finding state groups that can be deleted")
|
logger.info("[purge] finding state groups that can be deleted")
|
||||||
|
sg_to_delete = await self._find_unreferenced_groups(state_groups)
|
||||||
|
|
||||||
sg_to_delete = await self._find_unreferenced_groups(state_groups)
|
await self.stores.state.purge_unreferenced_state_groups(
|
||||||
|
room_id, sg_to_delete
|
||||||
await self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete)
|
)
|
||||||
|
|
||||||
async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
|
async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
|
||||||
"""Used when purging history to figure out which state groups can be
|
"""Used when purging history to figure out which state groups can be
|
||||||
|
|
|
@ -325,6 +325,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# We then run the same purge a second time without this isolation level to
|
# We then run the same purge a second time without this isolation level to
|
||||||
# purge any of those rows which were added during the first.
|
# purge any of those rows which were added during the first.
|
||||||
|
|
||||||
|
logger.info("[purge] Starting initial main purge of [1/2]")
|
||||||
state_groups_to_delete = await self.db_pool.runInteraction(
|
state_groups_to_delete = await self.db_pool.runInteraction(
|
||||||
"purge_room",
|
"purge_room",
|
||||||
self._purge_room_txn,
|
self._purge_room_txn,
|
||||||
|
@ -332,6 +333,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
isolation_level=IsolationLevel.READ_COMMITTED,
|
isolation_level=IsolationLevel.READ_COMMITTED,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.info("[purge] Starting secondary main purge of [2/2]")
|
||||||
state_groups_to_delete.extend(
|
state_groups_to_delete.extend(
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"purge_room",
|
"purge_room",
|
||||||
|
@ -339,6 +341,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
logger.info("[purge] Done with main purge")
|
||||||
|
|
||||||
return state_groups_to_delete
|
return state_groups_to_delete
|
||||||
|
|
||||||
|
@ -376,7 +379,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
)
|
)
|
||||||
referenced_chain_id_tuples = list(txn)
|
referenced_chain_id_tuples = list(txn)
|
||||||
|
|
||||||
logger.info("[purge] removing events from event_auth_chain_links")
|
logger.info("[purge] removing from event_auth_chain_links")
|
||||||
txn.executemany(
|
txn.executemany(
|
||||||
"""
|
"""
|
||||||
DELETE FROM event_auth_chain_links WHERE
|
DELETE FROM event_auth_chain_links WHERE
|
||||||
|
@ -399,7 +402,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
"rejections",
|
"rejections",
|
||||||
"state_events",
|
"state_events",
|
||||||
):
|
):
|
||||||
logger.info("[purge] removing %s from %s", room_id, table)
|
logger.info("[purge] removing from %s", table)
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"""
|
"""
|
||||||
|
@ -454,7 +457,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# happy
|
# happy
|
||||||
"rooms",
|
"rooms",
|
||||||
):
|
):
|
||||||
logger.info("[purge] removing %s from %s", room_id, table)
|
logger.info("[purge] removing from %s", table)
|
||||||
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
|
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
|
||||||
|
|
||||||
# Other tables we do NOT need to clear out:
|
# Other tables we do NOT need to clear out:
|
||||||
|
@ -486,6 +489,4 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
# that already exist.
|
# that already exist.
|
||||||
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
||||||
|
|
||||||
logger.info("[purge] done")
|
|
||||||
|
|
||||||
return state_groups
|
return state_groups
|
||||||
|
|
|
@ -805,12 +805,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||||
state_groups_to_delete: State groups to delete
|
state_groups_to_delete: State groups to delete
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
logger.info("[purge] Starting state purge")
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"purge_room_state",
|
"purge_room_state",
|
||||||
self._purge_room_state_txn,
|
self._purge_room_state_txn,
|
||||||
room_id,
|
room_id,
|
||||||
state_groups_to_delete,
|
state_groups_to_delete,
|
||||||
)
|
)
|
||||||
|
logger.info("[purge] Done with state purge")
|
||||||
|
|
||||||
def _purge_room_state_txn(
|
def _purge_room_state_txn(
|
||||||
self,
|
self,
|
||||||
|
|
Loading…
Reference in a new issue