forked from MirrorHub/synapse
Merge pull request #2946 from matrix-org/rav/timestamp_to_purge
Implement purge_history by timestamp
This commit is contained in:
commit
b2932107bb
4 changed files with 96 additions and 14 deletions
|
@ -8,9 +8,9 @@ Depending on the amount of history being purged a call to the API may take
|
||||||
several minutes or longer. During this period users will not be able to
|
several minutes or longer. During this period users will not be able to
|
||||||
paginate further back in the room from the point being purged from.
|
paginate further back in the room from the point being purged from.
|
||||||
|
|
||||||
The API is simply:
|
The API is:
|
||||||
|
|
||||||
``POST /_matrix/client/r0/admin/purge_history/<room_id>/<event_id>``
|
``POST /_matrix/client/r0/admin/purge_history/<room_id>[/<event_id>]``
|
||||||
|
|
||||||
including an ``access_token`` of a server admin.
|
including an ``access_token`` of a server admin.
|
||||||
|
|
||||||
|
@ -25,3 +25,10 @@ To delete local events as well, set ``delete_local_events`` in the body:
|
||||||
{
|
{
|
||||||
"delete_local_events": true
|
"delete_local_events": true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
The caller must specify the point in the room to purge up to. This can be
|
||||||
|
specified by including an event_id in the URI, or by setting a
|
||||||
|
``purge_up_to_event_id`` or ``purge_up_to_ts`` in the request body. If an event
|
||||||
|
id is given, that event (and others at the same graph depth) will be retained.
|
||||||
|
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
|
||||||
|
in milliseconds.
|
||||||
|
|
|
@ -52,16 +52,12 @@ class MessageHandler(BaseHandler):
|
||||||
self.pagination_lock = ReadWriteLock()
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, event_id, delete_local_events=False):
|
def purge_history(self, room_id, topological_ordering,
|
||||||
event = yield self.store.get_event(event_id)
|
delete_local_events=False):
|
||||||
|
|
||||||
if event.room_id != room_id:
|
|
||||||
raise SynapseError(400, "Event is for wrong room.")
|
|
||||||
|
|
||||||
depth = event.depth
|
|
||||||
|
|
||||||
with (yield self.pagination_lock.write(room_id)):
|
with (yield self.pagination_lock.write(room_id)):
|
||||||
yield self.store.purge_history(room_id, depth, delete_local_events)
|
yield self.store.purge_history(
|
||||||
|
room_id, topological_ordering, delete_local_events,
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import Membership
|
from synapse.api.constants import Membership
|
||||||
from synapse.api.errors import AuthError, SynapseError
|
from synapse.api.errors import AuthError, SynapseError, Codes
|
||||||
from synapse.types import UserID, create_requester
|
from synapse.types import UserID, create_requester
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
from synapse.http.servlet import parse_json_object_from_request
|
||||||
|
|
||||||
|
@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet):
|
||||||
|
|
||||||
class PurgeHistoryRestServlet(ClientV1RestServlet):
|
class PurgeHistoryRestServlet(ClientV1RestServlet):
|
||||||
PATTERNS = client_path_patterns(
|
PATTERNS = client_path_patterns(
|
||||||
"/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
|
"/admin/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hs (synapse.server.HomeServer)
|
||||||
|
"""
|
||||||
super(PurgeHistoryRestServlet, self).__init__(hs)
|
super(PurgeHistoryRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.handlers = hs.get_handlers()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_POST(self, request, room_id, event_id):
|
def on_POST(self, request, room_id, event_id):
|
||||||
|
@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
||||||
|
|
||||||
delete_local_events = bool(body.get("delete_local_events", False))
|
delete_local_events = bool(body.get("delete_local_events", False))
|
||||||
|
|
||||||
|
# establish the topological ordering we should keep events from. The
|
||||||
|
# user can provide an event_id in the URL or the request body, or can
|
||||||
|
# provide a timestamp in the request body.
|
||||||
|
if event_id is None:
|
||||||
|
event_id = body.get('purge_up_to_event_id')
|
||||||
|
|
||||||
|
if event_id is not None:
|
||||||
|
event = yield self.store.get_event(event_id)
|
||||||
|
|
||||||
|
if event.room_id != room_id:
|
||||||
|
raise SynapseError(400, "Event is for wrong room.")
|
||||||
|
|
||||||
|
depth = event.depth
|
||||||
|
logger.info(
|
||||||
|
"[purge] purging up to depth %i (event_id %s)",
|
||||||
|
depth, event_id,
|
||||||
|
)
|
||||||
|
elif 'purge_up_to_ts' in body:
|
||||||
|
ts = body['purge_up_to_ts']
|
||||||
|
if not isinstance(ts, int):
|
||||||
|
raise SynapseError(
|
||||||
|
400, "purge_up_to_ts must be an int",
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_ordering = (
|
||||||
|
yield self.store.find_first_stream_ordering_after_ts(ts)
|
||||||
|
)
|
||||||
|
|
||||||
|
(_, depth, _) = (
|
||||||
|
yield self.store.get_room_event_after_stream_ordering(
|
||||||
|
room_id, stream_ordering,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"[purge] purging up to depth %i (received_ts %i => "
|
||||||
|
"stream_ordering %i)",
|
||||||
|
depth, ts, stream_ordering,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"must specify purge_up_to_event_id or purge_up_to_ts",
|
||||||
|
errcode=Codes.BAD_JSON,
|
||||||
|
)
|
||||||
|
|
||||||
yield self.handlers.message_handler.purge_history(
|
yield self.handlers.message_handler.purge_history(
|
||||||
room_id, event_id,
|
room_id, depth,
|
||||||
delete_local_events=delete_local_events,
|
delete_local_events=delete_local_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -415,6 +415,33 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
"get_recent_events_for_room", get_recent_events_for_room_txn
|
"get_recent_events_for_room", get_recent_events_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
|
||||||
|
"""Gets details of the first event in a room at or after a stream ordering
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str):
|
||||||
|
stream_ordering (int):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[(int, int, str)]:
|
||||||
|
(stream ordering, topological ordering, event_id)
|
||||||
|
"""
|
||||||
|
def _f(txn):
|
||||||
|
sql = (
|
||||||
|
"SELECT stream_ordering, topological_ordering, event_id"
|
||||||
|
" FROM events"
|
||||||
|
" WHERE room_id = ? AND stream_ordering >= ?"
|
||||||
|
" AND NOT outlier"
|
||||||
|
" ORDER BY stream_ordering"
|
||||||
|
" LIMIT 1"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (room_id, stream_ordering, ))
|
||||||
|
return txn.fetchone()
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_room_event_after_stream_ordering", _f,
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_events_max_id(self, room_id=None):
|
def get_room_events_max_id(self, room_id=None):
|
||||||
"""Returns the current token for rooms stream.
|
"""Returns the current token for rooms stream.
|
||||||
|
|
Loading…
Add table
Reference in a new issue