forked from MirrorHub/synapse
Add profile data to the room_membership table for joins
This commit is contained in:
parent
de796f27e6
commit
c45d8e9ba2
4 changed files with 110 additions and 0 deletions
|
@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
)
|
||||
|
||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||
|
||||
super(DataStore, self).__init__(hs)
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
|
|||
from synapse.types import get_domain_from_id
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
|
|||
)
|
||||
|
||||
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
||||
|
||||
|
||||
class RoomMemberStore(SQLBaseStore):
|
||||
def __init__(self, hs):
|
||||
super(RoomMemberStore, self).__init__(hs)
|
||||
self.register_background_update_handler(
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
||||
)
|
||||
|
||||
def _store_room_members_txn(self, txn, events, backfilled):
|
||||
"""Store a room member in the database.
|
||||
|
@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
|
|||
"sender": event.user_id,
|
||||
"room_id": event.room_id,
|
||||
"membership": event.membership,
|
||||
"display_name": event.content.get("displayname", None),
|
||||
"avatar_url": event.content.get("avatar_url", None),
|
||||
}
|
||||
for event in events
|
||||
]
|
||||
|
@ -448,3 +459,78 @@ class RoomMemberStore(SQLBaseStore):
|
|||
defer.returnValue(True)
|
||||
|
||||
defer.returnValue(False)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_add_membership_profile(self, progress, batch_size):
|
||||
target_min_stream_id = progress.get(
|
||||
"target_min_stream_id_inclusive", self._min_stream_order_on_start
|
||||
)
|
||||
max_stream_id = progress.get(
|
||||
"max_stream_id_exclusive", self._stream_order_on_start + 1
|
||||
)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def add_membership_profile_txn(txn):
|
||||
sql = ("""
|
||||
SELECT stream_ordering, event_id, room_id, content
|
||||
FROM events
|
||||
INNER JOIN room_memberships USING (room_id, event_id)
|
||||
WHERE ? <= stream_ordering AND stream_ordering < ?
|
||||
AND type = 'm.room.member'
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
""")
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1]["stream_ordering"]
|
||||
|
||||
to_update = []
|
||||
for row in rows:
|
||||
event_id = row["event_id"]
|
||||
room_id = row["room_id"]
|
||||
try:
|
||||
content = json.loads(row["content"])
|
||||
except:
|
||||
continue
|
||||
|
||||
display_name = content.get("displayname", None)
|
||||
avatar_url = content.get("avatar_url", None)
|
||||
|
||||
if display_name or avatar_url:
|
||||
to_update.append((
|
||||
display_name, avatar_url, event_id, room_id
|
||||
))
|
||||
|
||||
to_update_sql = ("""
|
||||
UPDATE room_memberships SET display_name = ?, avatar_url = ?
|
||||
WHERE event_id = ? AND room_id = ?
|
||||
""")
|
||||
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
|
||||
clump = to_update[index:index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(to_update_sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
|
||||
)
|
||||
|
||||
return len(to_update)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
|
20
synapse/storage/schema/delta/39/membership_profile.sql
Normal file
20
synapse/storage/schema/delta/39/membership_profile.sql
Normal file
|
@ -0,0 +1,20 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE room_memberships ADD COLUMN display_name TEXT;
|
||||
ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT;
|
||||
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('room_membership_profile_update', '{}');
|
|
@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore):
|
|||
def get_room_max_stream_ordering(self):
|
||||
return self._stream_id_gen.get_current_token()
|
||||
|
||||
def get_room_min_stream_ordering(self):
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
|
||||
def get_stream_token_for_event(self, event_id):
|
||||
"""The stream token for an event
|
||||
Args:
|
||||
|
|
Loading…
Reference in a new issue