0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-16 15:01:23 +01:00

Merge pull request #5706 from matrix-org/erikj/add_memberships_to_current_state

Add membership column to current_state_events table
This commit is contained in:
Erik Johnston 2019-07-19 16:30:33 +01:00 committed by GitHub
commit 5c07c97c09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 195 additions and 41 deletions

1
changelog.d/5706.misc Normal file
View file

@ -0,0 +1 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -86,7 +86,21 @@ _CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
class LoggingTransaction(object): class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object """An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute() passed to the constructor. Adds logging and metrics to the .execute()
method.""" method.
Args:
txn: The database transcation object to wrap.
name (str): The name of this transactions for logging.
database_engine (Sqlite3Engine|PostgresEngine)
after_callbacks(list|None): A list that callbacks will be appended to
that have been added by `call_after` which should be run on
successful completion of the transaction. None indicates that no
callbacks should be allowed to be scheduled to run.
exception_callbacks(list|None): A list that callbacks will be appended
to that have been added by `call_on_exception` which should be run
if transaction ends with an error. None indicates that no callbacks
should be allowed to be scheduled to run.
"""
__slots__ = [ __slots__ = [
"txn", "txn",
@ -97,7 +111,7 @@ class LoggingTransaction(object):
] ]
def __init__( def __init__(
self, txn, name, database_engine, after_callbacks, exception_callbacks self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None
): ):
object.__setattr__(self, "txn", txn) object.__setattr__(self, "txn", txn)
object.__setattr__(self, "name", name) object.__setattr__(self, "name", name)

View file

@ -79,8 +79,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
db_conn.cursor(), db_conn.cursor(),
name="_find_stream_orderings_for_times_txn", name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine, database_engine=self.database_engine,
after_callbacks=[],
exception_callbacks=[],
) )
self._find_stream_orderings_for_times_txn(cur) self._find_stream_orderings_for_times_txn(cur)
cur.close() cur.close()

View file

@ -918,8 +918,6 @@ class EventsStore(
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
self._update_forward_extremities_txn( self._update_forward_extremities_txn(
txn, txn,
new_forward_extremities=new_forward_extremeties, new_forward_extremities=new_forward_extremeties,
@ -993,6 +991,10 @@ class EventsStore(
backfilled=backfilled, backfilled=backfilled,
) )
# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
for room_id, current_state_tuple in iteritems(state_delta_by_room): for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple to_delete, to_insert = current_state_tuple
@ -1062,16 +1064,16 @@ class EventsStore(
), ),
) )
self._simple_insert_many_txn( # We include the membership in the current state table, hence we do
txn, # a lookup when we insert. This assumes that all events have already
table="current_state_events", # been inserted into room_memberships.
values=[ txn.executemany(
{ """INSERT INTO current_state_events
"event_id": ev_id, (room_id, type, state_key, event_id, membership)
"room_id": room_id, VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
"type": key[0], """,
"state_key": key[1], [
} (room_id, key[0], key[1], ev_id, ev_id)
for key, ev_id in iteritems(to_insert) for key, ev_id in iteritems(to_insert)
], ],
) )

View file

@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 55 SCHEMA_VERSION = 56
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View file

@ -24,6 +24,8 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
@ -53,9 +55,51 @@ ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
MemberSummary = namedtuple("MemberSummary", ("members", "count")) MemberSummary = namedtuple("MemberSummary", ("members", "count"))
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
class RoomMemberWorkerStore(EventsWorkerStore): class RoomMemberWorkerStore(EventsWorkerStore):
def __init__(self, db_conn, hs):
super(RoomMemberWorkerStore, self).__init__(db_conn, hs)
# Is the current_state_events.membership up to date? Or is the
# background update still running?
self._current_state_events_membership_up_to_date = False
txn = LoggingTransaction(
db_conn.cursor(),
name="_check_safe_current_state_events_membership_updated",
database_engine=self.database_engine,
)
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
"""
pending_update = self._simple_select_one_txn(
txn,
table="background_updates",
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
retcols=["update_name"],
allow_none=True,
)
self._current_state_events_membership_up_to_date = not pending_update
# If the update is still running, reschedule to run.
if pending_update:
self._clock.call_later(
15.0,
run_as_background_process,
"_check_safe_current_state_events_membership_updated",
self.runInteraction,
"_check_safe_current_state_events_membership_updated",
self._check_safe_current_state_events_membership_updated_txn,
)
@cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True) @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
def get_hosts_in_room(self, room_id, cache_context): def get_hosts_in_room(self, room_id, cache_context):
"""Returns the set of all hosts currently in the room """Returns the set of all hosts currently in the room
@ -69,14 +113,23 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True) @cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id): def get_users_in_room(self, room_id):
def f(txn): def f(txn):
sql = ( # If we can assume current_state_events.membership is up to date
"SELECT m.user_id FROM room_memberships as m" # then we can avoid a join, which is a Very Good Thing given how
" INNER JOIN current_state_events as c" # frequently this function gets called.
" ON m.event_id = c.event_id " if self._current_state_events_membership_up_to_date:
" AND m.room_id = c.room_id " sql = """
" AND m.user_id = c.state_key" SELECT state_key FROM current_state_events
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
) """
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
txn.execute(sql, (room_id, Membership.JOIN)) txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn] return [to_ascii(r[0]) for r in txn]
@ -98,15 +151,26 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# first get counts. # first get counts.
# We do this all in one transaction to keep the cache small. # We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats # FIXME: get rid of this when we have room_stats
sql = """
SELECT count(*), m.membership FROM room_memberships as m # If we can assume current_state_events.membership is up to date
INNER JOIN current_state_events as c # then we can avoid a join, which is a Very Good Thing given how
ON m.event_id = c.event_id # frequently this function gets called.
AND m.room_id = c.room_id if self._current_state_events_membership_up_to_date:
AND m.user_id = c.state_key sql = """
WHERE c.type = 'm.room.member' AND c.room_id = ? SELECT count(*), membership FROM current_state_events
GROUP BY m.membership WHERE type = 'm.room.member' AND room_id = ?
""" GROUP BY membership
"""
else:
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""
txn.execute(sql, (room_id,)) txn.execute(sql, (room_id,))
res = {} res = {}
@ -224,7 +288,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
results = [] results = []
if membership_list: if membership_list:
where_clause = "user_id = ? AND (%s) AND forgotten = 0" % ( where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
" OR ".join(["membership = ?" for _ in membership_list]), " OR ".join(["m.membership = ?" for _ in membership_list]),
) )
args = [user_id] args = [user_id]
@ -453,8 +517,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
sql = """ sql = """
SELECT state_key FROM current_state_events AS c SELECT state_key FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id) INNER JOIN room_memberships AS m USING (event_id)
WHERE membership = 'join' WHERE m.membership = 'join'
AND type = 'm.room.member' AND type = 'm.room.member'
AND c.room_id = ? AND c.room_id = ?
AND state_key LIKE ? AND state_key LIKE ?
@ -602,6 +666,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
self.register_background_update_handler( self.register_background_update_handler(
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
) )
self.register_background_update_handler(
_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
self._background_current_state_membership,
)
def _store_room_members_txn(self, txn, events, backfilled): def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database. """Store a room member in the database.
@ -781,6 +849,52 @@ class RoomMemberStore(RoomMemberWorkerStore):
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks
def _background_current_state_membership(self, progress, batch_size):
"""Update the new membership column on current_state_events.
"""
if "rooms" not in progress:
rooms = yield self._simple_select_onecol(
table="current_state_events",
keyvalues={},
retcol="DISTINCT room_id",
desc="_background_current_state_membership_get_rooms",
)
progress["rooms"] = rooms
rooms = progress["rooms"]
def _background_current_state_membership_txn(txn):
processed = 0
while rooms and processed < batch_size:
sql = """
UPDATE current_state_events AS c
SET membership = (
SELECT membership FROM room_memberships
WHERE event_id = c.event_id
)
WHERE room_id = ?
"""
txn.execute(sql, (rooms.pop(),))
processed += txn.rowcount
self._background_update_progress_txn(
txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
)
return processed
result = yield self.runInteraction(
"_background_current_state_membership_update",
_background_current_state_membership_txn,
)
if not rooms:
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
defer.returnValue(result)
class _JoinedHostsCache(object): class _JoinedHostsCache(object):
"""Cache for joined hosts in a room that is optimised to handle updates """Cache for joined hosts in a room that is optimised to handle updates

View file

@ -0,0 +1,25 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- We add membership to current state so that we don't need to join against
-- room_memberships, which can be surprisingly costly (we do such queries
-- very frequently).
-- This will be null for non-membership events and the content.membership key
-- for membership events. (Will also be null for membership events until the
-- background update job has finished).
ALTER TABLE current_state_events ADD membership TEXT;
INSERT INTO background_updates (update_name, progress_json) VALUES
('current_state_events_membership', '{}');

View file

@ -618,15 +618,15 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
sql = """ sql = """
SELECT room_id FROM ( SELECT room_id FROM (
SELECT c.room_id FROM current_state_events AS c SELECT c.room_id FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id) INNER JOIN room_memberships AS m USING (event_id)
WHERE type = 'm.room.member' WHERE type = 'm.room.member'
AND membership = 'join' AND m.membership = 'join'
AND state_key = ? AND state_key = ?
) AS f1 INNER JOIN ( ) AS f1 INNER JOIN (
SELECT c.room_id FROM current_state_events AS c SELECT c.room_id FROM current_state_events AS c
INNER JOIN room_memberships USING (event_id) INNER JOIN room_memberships AS m USING (event_id)
WHERE type = 'm.room.member' WHERE type = 'm.room.member'
AND membership = 'join' AND m.membership = 'join'
AND state_key = ? AND state_key = ?
) f2 USING (room_id) ) f2 USING (room_id)
""" """