Split purge API into events vs state

This commit is contained in:
Erik Johnston 2019-10-30 15:12:49 +00:00
parent b7fe62b766
commit 7c8c97e635
6 changed files with 308 additions and 184 deletions

View file

@ -69,6 +69,7 @@ class PaginationHandler(object):
self.hs = hs self.hs = hs
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._server_name = hs.hostname self._server_name = hs.hostname
@ -125,7 +126,9 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id) self._purges_in_progress_by_room.add(room_id)
try: try:
with (yield self.pagination_lock.write(room_id)): with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, token, delete_local_events) yield self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete") logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception: except Exception:
@ -168,7 +171,7 @@ class PaginationHandler(object):
if joined: if joined:
raise SynapseError(400, "Users are still joined to this room") raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id) await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_messages( def get_messages(

View file

@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
__all__ = ["DataStores", "DataStore"] __all__ = ["DataStores", "DataStore"]
@ -45,6 +46,7 @@ class Storage(object):
self.main = stores.main self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores) self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
def are_all_users_on_domain(txn, database_engine, domain): def are_all_users_on_domain(txn, database_engine, domain):

View file

@ -1368,6 +1368,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their (instead of just marking them as outliers and deleting their
state groups). state groups).
Returns:
Deferred[set[int]]: The set of state groups that reference deleted
events.
""" """
return self.runInteraction( return self.runInteraction(
@ -1521,60 +1525,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups) "[purge] found %i referenced state groups", len(referenced_state_groups)
) )
logger.info("[purge] finding state groups that can be deleted")
_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
state_groups_to_delete, remaining_state_groups = _
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups") logger.info("[purge] removing events from event_to_state_groups")
txn.execute( txn.execute(
"DELETE FROM event_to_state_groups " "DELETE FROM event_to_state_groups "
@ -1661,87 +1611,7 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
def _find_unreferenced_groups_during_purge(self, txn, state_groups): return referenced_state_groups
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE ep.event_id IS NULL AND
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "state_group", current_search
)
txn.execute(sql + clause, list(args))
referenced = set(sg for sg, in txn)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group"),
)
prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]
to_delete = state_groups_seen - referenced_groups
to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)
return to_delete, to_dedelta
def purge_room(self, room_id): def purge_room(self, room_id):
"""Deletes all record of a room """Deletes all record of a room
@ -1753,46 +1623,7 @@ class EventsStore(
return self.runInteraction("purge_room", self._purge_room_txn, room_id) return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id): def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states # First delete tables which lack an index on room_id but have one on event_id
logger.info("[purge] removing %s from state_groups_state", room_id)
txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
for table in ( for table in (
"event_auth", "event_auth",
"event_edges", "event_edges",
@ -1881,6 +1712,153 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
def purge_unreferenced_state_groups(self, room_id, state_groups_to_delete):
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
"""
return self.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
state_groups_to_delete,
)
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=state_groups_to_delete,
keyvalues={},
retcols=("state_group",),
)
remaining_state_groups = set(
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
@defer.inlineCallbacks
def get_previous_state_groups(self, state_groups):
"""Fetch the previous groups of the given state groups.
Args:
state_groups (Iterable[int])
Returns:
Deferred[dict[int, int]]: mapping from state group to previous
state group.
"""
rows = yield self._simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
keyvalues={},
retcols=("prev_state_group", "state_group"),
desc="get_previous_state_groups",
)
return {row["state_group"]: row["prev_state_group"] for row in rows}
def purge_room_state(self, room_id):
"""Deletes all record of a room from state tables
Args:
room_id (str):
"""
return self.runInteraction(
"purge_room_state", self._purge_room_state_txn, room_id
)
def _purge_room_state_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
txn.execute(
"""
DELETE FROM state_groups_state
INNER JOIN state_groups USING (event_id)
WHEREE state_groups.room_id = ?
""",
(room_id,),
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute(
"""
DELETE FROM state_group_edges
INNER JOIN state_groups USING (event_id)
WHEREE state_groups.room_id = ?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHEREE room_id = ?
""",
(room_id,),
)
async def is_event_after(self, event_id1, event_id2): async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream """Returns True if event_id1 is after event_id2 in the stream
""" """

View file

@ -989,6 +989,29 @@ class StateGroupWorkerStore(
return self.runInteraction("store_state_group", _store_state_group_txn) return self.runInteraction("store_state_group", _store_state_group_txn)
@defer.inlineCallbacks
def get_referenced_state_groups(self, state_groups):
"""Check if the state groups are referenced by events.
Args:
state_groups (Iterable[int])
Returns:
Deferred[set[int]]: The subset of state groups that are
referenced.
"""
rows = yield self._simple_select_many_batch(
table="event_to_state_groups",
column="state_group",
iterable=state_groups,
keyvalues={},
retcols=("DISTINCT state_group",),
desc="get_referenced_state_groups",
)
return set(row["state_group"] for row in rows)
class StateBackgroundUpdateStore( class StateBackgroundUpdateStore(
StateGroupBackgroundUpdateStore, BackgroundUpdateStore StateGroupBackgroundUpdateStore, BackgroundUpdateStore

View file

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from twisted.internet import defer
logger = logging.getLogger(__name__)
class PurgeEventsStorage(object):
"""High level interface for purging rooms and event history.
"""
def __init__(self, hs, stores):
self.stores = stores
@defer.inlineCallbacks
def purge_room(self, room_id: str):
"""Deletes all record of a room
"""
yield self.stores.main.purge_room(room_id)
yield self.stores.main.purge_room_state(room_id)
@defer.inlineCallbacks
def purge_history(self, room_id, token, delete_local_events):
"""Deletes room history before a certain point
Args:
room_id (str):
token (str): A topological token to delete events before
delete_local_events (bool):
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
"""
state_groups = yield self.stores.main.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] finding state groups that can be deleted")
sg_to_delete = yield self._find_unreferenced_groups(state_groups)
yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
@defer.inlineCallbacks
def _find_unreferenced_groups(self, state_groups):
"""Used when purging history to figure out which state groups can be
deleted.
Args:
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
Deferred[set[int]] The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
referenced = yield self.stores.main.get_referenced_state_groups(
current_search
)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
edges = yield self.stores.main.get_previous_state_groups(current_search)
prevs = set(edges.values())
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
graph.update(edges)
to_delete = state_groups_seen - referenced_groups
return to_delete

View file

@ -40,23 +40,24 @@ class PurgeTests(HomeserverTestCase):
third = self.helper.send(self.room_id, body="test3") third = self.helper.send(self.room_id, body="test3")
last = self.helper.send(self.room_id, body="test4") last = self.helper.send(self.room_id, body="test4")
storage = self.hs.get_datastore() store = self.hs.get_datastore()
storage = self.hs.get_storage()
# Get the topological token # Get the topological token
event = storage.get_topological_token_for_event(last["event_id"]) event = store.get_topological_token_for_event(last["event_id"])
self.pump() self.pump()
event = self.successResultOf(event) event = self.successResultOf(event)
# Purge everything before this topological token # Purge everything before this topological token
purge = storage.purge_history(self.room_id, event, True) purge = storage.purge_events.purge_history(self.room_id, event, True)
self.pump() self.pump()
self.assertEqual(self.successResultOf(purge), None) self.assertEqual(self.successResultOf(purge), None)
# Try and get the events # Try and get the events
get_first = storage.get_event(first["event_id"]) get_first = store.get_event(first["event_id"])
get_second = storage.get_event(second["event_id"]) get_second = store.get_event(second["event_id"])
get_third = storage.get_event(third["event_id"]) get_third = store.get_event(third["event_id"])
get_last = storage.get_event(last["event_id"]) get_last = store.get_event(last["event_id"])
self.pump() self.pump()
# 1-3 should fail and last will succeed, meaning that 1-3 are deleted # 1-3 should fail and last will succeed, meaning that 1-3 are deleted