forked from MirrorHub/synapse
Make deleting stale pushers a background update (#9536)
This commit is contained in:
parent
a5daae2a5f
commit
16f9f93eb7
3 changed files with 55 additions and 1 deletions
1
changelog.d/9536.bugfix
Normal file
1
changelog.d/9536.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix deleting pushers when using sharded pushers.
|
|
@ -44,6 +44,11 @@ class PusherWorkerStore(SQLBaseStore):
|
||||||
self._remove_deactivated_pushers,
|
self._remove_deactivated_pushers,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.db_pool.updates.register_background_update_handler(
|
||||||
|
"remove_stale_pushers",
|
||||||
|
self._remove_stale_pushers,
|
||||||
|
)
|
||||||
|
|
||||||
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
|
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
|
||||||
"""JSON-decode the data in the rows returned from the `pushers` table
|
"""JSON-decode the data in the rows returned from the `pushers` table
|
||||||
|
|
||||||
|
@ -337,6 +342,53 @@ class PusherWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return number_deleted
|
return number_deleted
|
||||||
|
|
||||||
|
async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
|
||||||
|
"""A background update that deletes all pushers for logged out devices.
|
||||||
|
|
||||||
|
Note that we don't proacively tell the pusherpool that we've deleted
|
||||||
|
these (just because its a bit off a faff to do from here), but they will
|
||||||
|
get cleaned up at the next restart
|
||||||
|
"""
|
||||||
|
|
||||||
|
last_pusher = progress.get("last_pusher", 0)
|
||||||
|
|
||||||
|
def _delete_pushers(txn) -> int:
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT p.id, access_token FROM pushers AS p
|
||||||
|
LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
|
||||||
|
WHERE p.id > ?
|
||||||
|
ORDER BY p.id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (last_pusher, batch_size))
|
||||||
|
pushers = [(row[0], row[1]) for row in txn]
|
||||||
|
|
||||||
|
self.db_pool.simple_delete_many_txn(
|
||||||
|
txn,
|
||||||
|
table="pushers",
|
||||||
|
column="id",
|
||||||
|
iterable=(pusher_id for pusher_id, token in pushers if token is None),
|
||||||
|
keyvalues={},
|
||||||
|
)
|
||||||
|
|
||||||
|
if pushers:
|
||||||
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
|
txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(pushers)
|
||||||
|
|
||||||
|
number_deleted = await self.db_pool.runInteraction(
|
||||||
|
"_remove_stale_pushers", _delete_pushers
|
||||||
|
)
|
||||||
|
|
||||||
|
if number_deleted < batch_size:
|
||||||
|
await self.db_pool.updates._end_background_update("remove_stale_pushers")
|
||||||
|
|
||||||
|
return number_deleted
|
||||||
|
|
||||||
|
|
||||||
class PusherStore(PusherWorkerStore):
|
class PusherStore(PusherWorkerStore):
|
||||||
def get_pushers_stream_token(self) -> int:
|
def get_pushers_stream_token(self) -> int:
|
||||||
|
|
|
@ -16,4 +16,5 @@
|
||||||
|
|
||||||
-- Delete all pushers associated with deleted devices. This is to clear up after
|
-- Delete all pushers associated with deleted devices. This is to clear up after
|
||||||
-- a bug where they weren't correctly deleted when using workers.
|
-- a bug where they weren't correctly deleted when using workers.
|
||||||
DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
|
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||||
|
(5908, 'remove_stale_pushers', '{}');
|
||||||
|
|
Loading…
Reference in a new issue