forked from MirrorHub/synapse
Add background update for current_state_events.membership column
This commit is contained in:
parent
6de09e07a6
commit
c618a5d348
2 changed files with 54 additions and 0 deletions
synapse/storage
|
@ -53,6 +53,7 @@ ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
|
||||||
MemberSummary = namedtuple("MemberSummary", ("members", "count"))
|
MemberSummary = namedtuple("MemberSummary", ("members", "count"))
|
||||||
|
|
||||||
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
||||||
|
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberWorkerStore(EventsWorkerStore):
|
class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
|
@ -602,6 +603,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
self.register_background_update_handler(
|
self.register_background_update_handler(
|
||||||
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
||||||
)
|
)
|
||||||
|
self.register_background_update_handler(
|
||||||
|
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
|
||||||
|
self._background_current_state_membership,
|
||||||
|
)
|
||||||
|
|
||||||
def _store_room_members_txn(self, txn, events, backfilled):
|
def _store_room_members_txn(self, txn, events, backfilled):
|
||||||
"""Store a room member in the database.
|
"""Store a room member in the database.
|
||||||
|
@ -781,6 +786,52 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _background_current_state_membership(self, progress, batch_size):
|
||||||
|
"""Update the new membership column on current_state_events.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if "rooms" not in progress:
|
||||||
|
rooms = yield self._simple_select_onecol(
|
||||||
|
table="current_state_events",
|
||||||
|
keyvalues={},
|
||||||
|
retcol="DISTINCT room_id",
|
||||||
|
desc="_background_current_state_membership_get_rooms",
|
||||||
|
)
|
||||||
|
progress["rooms"] = rooms
|
||||||
|
|
||||||
|
rooms = progress["rooms"]
|
||||||
|
|
||||||
|
def _background_current_state_membership_txn(txn):
|
||||||
|
processed = 0
|
||||||
|
while rooms and processed < batch_size:
|
||||||
|
sql = """
|
||||||
|
UPDATE current_state_events AS c
|
||||||
|
SET membership = (
|
||||||
|
SELECT membership FROM room_memberships
|
||||||
|
WHERE event_id = c.event_id
|
||||||
|
)
|
||||||
|
WHERE room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (rooms.pop(),))
|
||||||
|
processed += txn.rowcount
|
||||||
|
|
||||||
|
self._background_update_progress_txn(
|
||||||
|
txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
|
||||||
|
)
|
||||||
|
|
||||||
|
return processed
|
||||||
|
|
||||||
|
result = yield self.runInteraction(
|
||||||
|
"_background_current_state_membership_update",
|
||||||
|
_background_current_state_membership_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not rooms:
|
||||||
|
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
|
||||||
|
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
|
||||||
class _JoinedHostsCache(object):
|
class _JoinedHostsCache(object):
|
||||||
"""Cache for joined hosts in a room that is optimised to handle updates
|
"""Cache for joined hosts in a room that is optimised to handle updates
|
||||||
|
|
|
@ -17,3 +17,6 @@
|
||||||
-- room_memberships, which can be surprisingly costly (we do such queries
|
-- room_memberships, which can be surprisingly costly (we do such queries
|
||||||
-- very frequently).
|
-- very frequently).
|
||||||
ALTER TABLE current_state_events ADD membership TEXT;
|
ALTER TABLE current_state_events ADD membership TEXT;
|
||||||
|
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('current_state_events_membership', '{}');
|
||||||
|
|
Loading…
Add table
Reference in a new issue