forked from MirrorHub/synapse
Add remote profile cache
This commit is contained in:
parent
97c544f91f
commit
27ebc5c8f2
5 changed files with 237 additions and 5 deletions
|
@ -503,6 +503,13 @@ class GroupsServerHandler(object):
|
||||||
get_domain_from_id(user_id), group_id, user_id, content
|
get_domain_from_id(user_id), group_id, user_id, content
|
||||||
)
|
)
|
||||||
|
|
||||||
|
user_profile = res.get("user_profile", {})
|
||||||
|
yield self.store.add_remote_profile_cache(
|
||||||
|
user_id,
|
||||||
|
displayname=user_profile.get("displayname"),
|
||||||
|
avatar_url=user_profile.get("avatar_url"),
|
||||||
|
)
|
||||||
|
|
||||||
if res["state"] == "join":
|
if res["state"] == "join":
|
||||||
if not self.hs.is_mine_id(user_id):
|
if not self.hs.is_mine_id(user_id):
|
||||||
remote_attestation = res["attestation"]
|
remote_attestation = res["attestation"]
|
||||||
|
@ -627,6 +634,9 @@ class GroupsServerHandler(object):
|
||||||
get_domain_from_id(user_id), group_id, user_id, {}
|
get_domain_from_id(user_id), group_id, user_id, {}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self.hs.is_mine_id(user_id):
|
||||||
|
yield self.store.maybe_delete_remote_profile_cache(user_id)
|
||||||
|
|
||||||
defer.returnValue({})
|
defer.returnValue({})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -647,6 +657,7 @@ class GroupsServerHandler(object):
|
||||||
avatar_url = profile.get("avatar_url")
|
avatar_url = profile.get("avatar_url")
|
||||||
short_description = profile.get("short_description")
|
short_description = profile.get("short_description")
|
||||||
long_description = profile.get("long_description")
|
long_description = profile.get("long_description")
|
||||||
|
user_profile = content.get("user_profile", {})
|
||||||
|
|
||||||
yield self.store.create_group(
|
yield self.store.create_group(
|
||||||
group_id,
|
group_id,
|
||||||
|
@ -679,6 +690,13 @@ class GroupsServerHandler(object):
|
||||||
remote_attestation=remote_attestation,
|
remote_attestation=remote_attestation,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self.hs.is_mine_id(user_id):
|
||||||
|
yield self.store.add_remote_profile_cache(
|
||||||
|
user_id,
|
||||||
|
displayname=user_profile.get("displayname"),
|
||||||
|
avatar_url=user_profile.get("avatar_url"),
|
||||||
|
)
|
||||||
|
|
||||||
defer.returnValue({
|
defer.returnValue({
|
||||||
"group_id": group_id,
|
"group_id": group_id,
|
||||||
})
|
})
|
||||||
|
|
|
@ -56,6 +56,9 @@ class GroupsLocalHandler(object):
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.attestations = hs.get_groups_attestation_signing()
|
self.attestations = hs.get_groups_attestation_signing()
|
||||||
|
|
||||||
|
handlers = hs.get_handlers()
|
||||||
|
self.profile_handler = handlers.profile_handler
|
||||||
|
|
||||||
# Ensure attestations get renewed
|
# Ensure attestations get renewed
|
||||||
hs.get_groups_attestation_renewer()
|
hs.get_groups_attestation_renewer()
|
||||||
|
|
||||||
|
@ -123,6 +126,7 @@ class GroupsLocalHandler(object):
|
||||||
|
|
||||||
defer.returnValue(res)
|
defer.returnValue(res)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def create_group(self, group_id, user_id, content):
|
def create_group(self, group_id, user_id, content):
|
||||||
"""Create a group
|
"""Create a group
|
||||||
"""
|
"""
|
||||||
|
@ -130,13 +134,16 @@ class GroupsLocalHandler(object):
|
||||||
logger.info("Asking to create group with ID: %r", group_id)
|
logger.info("Asking to create group with ID: %r", group_id)
|
||||||
|
|
||||||
if self.is_mine_id(group_id):
|
if self.is_mine_id(group_id):
|
||||||
return self.groups_server_handler.create_group(
|
res = yield self.groups_server_handler.create_group(
|
||||||
group_id, user_id, content
|
group_id, user_id, content
|
||||||
)
|
)
|
||||||
|
defer.returnValue(res)
|
||||||
|
|
||||||
return self.transport_client.create_group(
|
content["user_profile"] = yield self.profile_handler.get_profile(user_id)
|
||||||
|
res = yield self.transport_client.create_group(
|
||||||
get_domain_from_id(group_id), group_id, user_id, content,
|
get_domain_from_id(group_id), group_id, user_id, content,
|
||||||
) # TODO
|
)
|
||||||
|
defer.returnValue(res)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_users_in_group(self, group_id, requester_user_id):
|
def get_users_in_group(self, group_id, requester_user_id):
|
||||||
|
@ -265,7 +272,9 @@ class GroupsLocalHandler(object):
|
||||||
"groups_key", token, users=[user_id],
|
"groups_key", token, users=[user_id],
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue({"state": "invite"})
|
user_profile = yield self.profile_handler.get_profile(user_id)
|
||||||
|
|
||||||
|
defer.returnValue({"state": "invite", "user_profile": user_profile})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
|
def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
|
||||||
|
|
|
@ -19,7 +19,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
import synapse.types
|
import synapse.types
|
||||||
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
|
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
|
||||||
from synapse.types import UserID
|
from synapse.types import UserID, get_domain_from_id
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
|
|
||||||
|
@ -27,15 +27,53 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ProfileHandler(BaseHandler):
|
class ProfileHandler(BaseHandler):
|
||||||
|
PROFILE_UPDATE_MS = 60 * 1000
|
||||||
|
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(ProfileHandler, self).__init__(hs)
|
super(ProfileHandler, self).__init__(hs)
|
||||||
|
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
self.federation = hs.get_replication_layer()
|
self.federation = hs.get_replication_layer()
|
||||||
self.federation.register_query_handler(
|
self.federation.register_query_handler(
|
||||||
"profile", self.on_profile_query
|
"profile", self.on_profile_query
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_profile(self, user_id):
|
||||||
|
target_user = UserID.from_string(user_id)
|
||||||
|
if self.hs.is_mine(target_user):
|
||||||
|
displayname = yield self.store.get_profile_displayname(
|
||||||
|
target_user.localpart
|
||||||
|
)
|
||||||
|
avatar_url = yield self.store.get_profile_avatar_url(
|
||||||
|
target_user.localpart
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue({
|
||||||
|
"displayname": displayname,
|
||||||
|
"avatar_url": avatar_url,
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
result = yield self.federation.make_query(
|
||||||
|
destination=target_user.domain,
|
||||||
|
query_type="profile",
|
||||||
|
args={
|
||||||
|
"user_id": user_id,
|
||||||
|
},
|
||||||
|
ignore_backoff=True,
|
||||||
|
)
|
||||||
|
defer.returnValue(result)
|
||||||
|
except CodeMessageException as e:
|
||||||
|
if e.code != 404:
|
||||||
|
logger.exception("Failed to get displayname")
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_displayname(self, target_user):
|
def get_displayname(self, target_user):
|
||||||
if self.hs.is_mine(target_user):
|
if self.hs.is_mine(target_user):
|
||||||
|
@ -182,3 +220,44 @@ class ProfileHandler(BaseHandler):
|
||||||
"Failed to update join event for room %s - %s",
|
"Failed to update join event for room %s - %s",
|
||||||
room_id, str(e.message)
|
room_id, str(e.message)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _update_remote_profile_cache(self):
|
||||||
|
"""Called periodically to check profiles of remote users we havent'
|
||||||
|
checked in a while.
|
||||||
|
"""
|
||||||
|
entries = yield self.store.get_remote_profile_cache_entries_that_expire(
|
||||||
|
last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS
|
||||||
|
)
|
||||||
|
|
||||||
|
for user_id, displayname, avatar_url in entries:
|
||||||
|
is_subcscribed = yield self.store.is_subscribed_remote_profile_for_user(
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
|
if not is_subcscribed:
|
||||||
|
yield self.store.maybe_delete_remote_profile_cache(user_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
profile = yield self.federation.make_query(
|
||||||
|
destination=get_domain_from_id(user_id),
|
||||||
|
query_type="profile",
|
||||||
|
args={
|
||||||
|
"user_id": user_id,
|
||||||
|
},
|
||||||
|
ignore_backoff=True,
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to get avatar_url")
|
||||||
|
|
||||||
|
yield self.store.update_remote_profile_cache(
|
||||||
|
user_id, displayname, avatar_url
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
new_name = profile.get("displayname")
|
||||||
|
new_avatar = profile.get("avatar_url")
|
||||||
|
|
||||||
|
# We always hit update to update the last_check timestamp
|
||||||
|
yield self.store.update_remote_profile_cache(
|
||||||
|
user_id, new_name, new_avatar
|
||||||
|
)
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,3 +57,99 @@ class ProfileStore(SQLBaseStore):
|
||||||
updatevalues={"avatar_url": new_avatar_url},
|
updatevalues={"avatar_url": new_avatar_url},
|
||||||
desc="set_profile_avatar_url",
|
desc="set_profile_avatar_url",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_from_remote_profile_cache(self, user_id):
|
||||||
|
return self._simple_select_one(
|
||||||
|
table="remote_profile_cache",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
retcols=("displayname", "avatar_url", "last_check"),
|
||||||
|
allow_none=True,
|
||||||
|
desc="get_from_remote_profile_cache",
|
||||||
|
)
|
||||||
|
|
||||||
|
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
|
||||||
|
"""Ensure we are caching the remote user's profiles.
|
||||||
|
|
||||||
|
This should only be called when `is_subscribed_remote_profile_for_user`
|
||||||
|
would return true for the user.
|
||||||
|
"""
|
||||||
|
return self._simple_upsert(
|
||||||
|
table="remote_profile_cache",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
values={
|
||||||
|
"displayname": displayname,
|
||||||
|
"avatar_url": avatar_url,
|
||||||
|
"last_check": self._clock.time_msec(),
|
||||||
|
},
|
||||||
|
desc="add_remote_profile_cache",
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_remote_profile_cache(self, user_id, displayname, avatar_url):
|
||||||
|
return self._simple_update(
|
||||||
|
table="remote_profile_cache",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
values={
|
||||||
|
"displayname": displayname,
|
||||||
|
"avatar_url": avatar_url,
|
||||||
|
"last_check": self._clock.time_msec(),
|
||||||
|
},
|
||||||
|
desc="update_remote_profile_cache",
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def maybe_delete_remote_profile_cache(self, user_id):
|
||||||
|
"""Check if we still care about the remote user's profile, and if we
|
||||||
|
don't then remove their profile from the cache
|
||||||
|
"""
|
||||||
|
subscribed = yield self.is_subscribed_remote_profile_for_user(user_id)
|
||||||
|
if not subscribed:
|
||||||
|
yield self._simple_delete(
|
||||||
|
table="remote_profile_cache",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
desc="delete_remote_profile_cache",
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_remote_profile_cache_entries_that_expire(self, last_checked):
|
||||||
|
"""Get all users who haven't been checked since `last_checked`
|
||||||
|
"""
|
||||||
|
def _get_remote_profile_cache_entries_that_expire_txn(txn):
|
||||||
|
sql = """
|
||||||
|
SELECT user_id, displayname, avatar_url
|
||||||
|
FROM remote_profile_cache
|
||||||
|
WHERE last_check < ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (last_checked,))
|
||||||
|
|
||||||
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_remote_profile_cache_entries_that_expire",
|
||||||
|
_get_remote_profile_cache_entries_that_expire_txn,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def is_subscribed_remote_profile_for_user(self, user_id):
|
||||||
|
"""Check whether we are interested in a remote user's profile.
|
||||||
|
"""
|
||||||
|
res = yield self._simple_select_one_onecol(
|
||||||
|
table="group_users",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
retcol="user_id",
|
||||||
|
allow_none=True,
|
||||||
|
desc="should_update_remote_profile_cache_for_user",
|
||||||
|
)
|
||||||
|
|
||||||
|
if res:
|
||||||
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
res = yield self._simple_select_one_onecol(
|
||||||
|
table="group_invites",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
retcol="user_id",
|
||||||
|
allow_none=True,
|
||||||
|
desc="should_update_remote_profile_cache_for_user",
|
||||||
|
)
|
||||||
|
|
||||||
|
if res:
|
||||||
|
defer.returnValue(True)
|
||||||
|
|
28
synapse/storage/schema/delta/43/profile_cache.sql
Normal file
28
synapse/storage/schema/delta/43/profile_cache.sql
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
/* Copyright 2017 New Vector Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
-- A subset of remote users whose profiles we have cached.
|
||||||
|
-- Whether a user is in this table or not is defined by the storage function
|
||||||
|
-- `is_subscribed_remote_profile_for_user`
|
||||||
|
CREATE TABLE remote_profile_cache (
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
displayname TEXT,
|
||||||
|
avatar_url TEXT,
|
||||||
|
last_check BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id);
|
||||||
|
CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check);
|
Loading…
Reference in a new issue