forked from MirrorHub/synapse
Merge remote-tracking branch 'origin/develop' into dbkr/http_request_propagate_error
This commit is contained in:
commit
81804909d3
19 changed files with 309 additions and 122 deletions
19
README.rst
19
README.rst
|
@ -245,6 +245,25 @@ Setting up a TURN server
|
|||
For reliable VoIP calls to be routed via this homeserver, you MUST configure
|
||||
a TURN server. See `<docs/turn-howto.rst>`_ for details.
|
||||
|
||||
IPv6
|
||||
----
|
||||
|
||||
As of Synapse 0.19 we finally support IPv6, many thanks to @kyrias and @glyph
|
||||
for providing PR #1696.
|
||||
|
||||
However, for federation to work on hosts with IPv6 DNS servers you **must**
|
||||
be running Twisted 17.1.0 or later - see https://github.com/matrix-org/synapse/issues/1002
|
||||
for details. We can't make Synapse depend on Twisted 17.1 by default
|
||||
yet as it will break most older distributions (see https://github.com/matrix-org/synapse/pull/1909)
|
||||
so if you are using operating system dependencies you'll have to install your
|
||||
own Twisted 17.1 package via pip or backports etc.
|
||||
|
||||
If you're running in a virtualenv then pip should have installed the newest
|
||||
Twisted automatically, but if your virtualenv is old you will need to manually
|
||||
upgrade to a newer Twisted dependency via:
|
||||
|
||||
pip install Twisted>=17.1.0
|
||||
|
||||
|
||||
Running Synapse
|
||||
===============
|
||||
|
|
|
@ -36,15 +36,13 @@ class HttpClient(object):
|
|||
the request body. This will be encoded as JSON.
|
||||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* HTTP response.
|
||||
|
||||
The result of the deferred is a tuple of `(code, response)`,
|
||||
where `response` is a dict representing the decoded JSON body.
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body.
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_json(self, url, args=None):
|
||||
""" Get's some json from the given host homeserver and path
|
||||
""" Gets some json from the given host homeserver and path
|
||||
|
||||
Args:
|
||||
url (str): The URL to GET data from.
|
||||
|
@ -54,10 +52,8 @@ class HttpClient(object):
|
|||
and *not* a string.
|
||||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* HTTP response.
|
||||
|
||||
The result of the deferred is a tuple of `(code, response)`,
|
||||
where `response` is a dict representing the decoded JSON body.
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
@ -214,4 +210,4 @@ class _JsonProducer(object):
|
|||
pass
|
||||
|
||||
def stopProducing(self):
|
||||
pass
|
||||
pass
|
||||
|
|
73
docs/admin_api/user_admin_api.rst
Normal file
73
docs/admin_api/user_admin_api.rst
Normal file
|
@ -0,0 +1,73 @@
|
|||
Query Account
|
||||
=============
|
||||
|
||||
This API returns information about a specific user account.
|
||||
|
||||
The api is::
|
||||
|
||||
GET /_matrix/client/r0/admin/whois/<user_id>
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
It returns a JSON body like the following:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"user_id": "<user_id>",
|
||||
"devices": {
|
||||
"": {
|
||||
"sessions": [
|
||||
{
|
||||
"connections": [
|
||||
{
|
||||
"ip": "1.2.3.4",
|
||||
"last_seen": 1417222374433,
|
||||
"user_agent": "Mozilla/5.0 ..."
|
||||
},
|
||||
{
|
||||
"ip": "1.2.3.10",
|
||||
"last_seen": 1417222374500,
|
||||
"user_agent": "Dalvik/2.1.0 ..."
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
``last_seen`` is measured in milliseconds since the Unix epoch.
|
||||
|
||||
Deactivate Account
|
||||
==================
|
||||
|
||||
This API deactivates an account. It removes active access tokens, resets the
|
||||
password, and deletes third-party IDs (to prevent the user requesting a
|
||||
password reset).
|
||||
|
||||
The api is::
|
||||
|
||||
POST /_matrix/client/r0/admin/deactivate/<user_id>
|
||||
|
||||
including an ``access_token`` of a server admin, and an empty request body.
|
||||
|
||||
|
||||
Reset password
|
||||
==============
|
||||
|
||||
Changes the password of another user.
|
||||
|
||||
The api is::
|
||||
|
||||
POST /_matrix/client/r0/admin/reset_password/<user_id>
|
||||
|
||||
with a body of:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"new_password": "<secret>"
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
|
@ -21,13 +21,12 @@ How to monitor Synapse metrics using Prometheus
|
|||
|
||||
3. Add a prometheus target for synapse.
|
||||
|
||||
It needs to set the ``metrics_path`` to a non-default value::
|
||||
It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``)::
|
||||
|
||||
- job_name: "synapse"
|
||||
metrics_path: "/_synapse/metrics"
|
||||
static_configs:
|
||||
- targets:
|
||||
"my.server.here:9092"
|
||||
- targets: ["my.server.here:9092"]
|
||||
|
||||
If your prometheus is older than 1.5.2, you will need to replace
|
||||
``static_configs`` in the above with ``target_groups``.
|
||||
|
|
|
@ -474,8 +474,13 @@ class FederationClient(FederationBase):
|
|||
content (object): Any additional data to put into the content field
|
||||
of the event.
|
||||
Return:
|
||||
A tuple of (origin (str), event (object)) where origin is the remote
|
||||
homeserver which generated the event.
|
||||
Deferred: resolves to a tuple of (origin (str), event (object))
|
||||
where origin is the remote homeserver which generated the event.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
"""
|
||||
valid_memberships = {Membership.JOIN, Membership.LEAVE}
|
||||
if membership not in valid_memberships:
|
||||
|
@ -528,6 +533,27 @@ class FederationClient(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def send_join(self, destinations, pdu):
|
||||
"""Sends a join event to one of a list of homeservers.
|
||||
|
||||
Doing so will cause the remote server to add the event to the graph,
|
||||
and send the event out to the rest of the federation.
|
||||
|
||||
Args:
|
||||
destinations (str): Candidate homeservers which are probably
|
||||
participating in the room.
|
||||
pdu (BaseEvent): event to be sent
|
||||
|
||||
Return:
|
||||
Deferred: resolves to a dict with members ``origin`` (a string
|
||||
giving the serer the event was sent to, ``state`` (?) and
|
||||
``auth_chain``.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
"""
|
||||
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
@ -635,6 +661,26 @@ class FederationClient(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def send_leave(self, destinations, pdu):
|
||||
"""Sends a leave event to one of a list of homeservers.
|
||||
|
||||
Doing so will cause the remote server to add the event to the graph,
|
||||
and send the event out to the rest of the federation.
|
||||
|
||||
This is mostly useful to reject received invites.
|
||||
|
||||
Args:
|
||||
destinations (str): Candidate homeservers which are probably
|
||||
participating in the room.
|
||||
pdu (BaseEvent): event to be sent
|
||||
|
||||
Return:
|
||||
Deferred: resolves to None.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
returns a non-200 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
"""
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
|
|
@ -193,6 +193,26 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def make_membership_event(self, destination, room_id, user_id, membership):
|
||||
"""Asks a remote server to build and sign us a membership event
|
||||
|
||||
Note that this does not append any events to any graphs.
|
||||
|
||||
Args:
|
||||
destination (str): address of remote homeserver
|
||||
room_id (str): room to join/leave
|
||||
user_id (str): user to be joined/left
|
||||
membership (str): one of join/leave
|
||||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body (ie, the new event).
|
||||
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response
|
||||
code >= 300.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
"""
|
||||
valid_memberships = {Membership.JOIN, Membership.LEAVE}
|
||||
if membership not in valid_memberships:
|
||||
raise RuntimeError(
|
||||
|
@ -201,11 +221,23 @@ class TransportLayerClient(object):
|
|||
)
|
||||
path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
|
||||
|
||||
ignore_backoff = False
|
||||
retry_on_dns_fail = False
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
# we particularly want to do our best to send leave events. The
|
||||
# problem is that if it fails, we won't retry it later, so if the
|
||||
# remote server was just having a momentary blip, the room will be
|
||||
# out of sync.
|
||||
ignore_backoff = True
|
||||
retry_on_dns_fail = True
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
retry_on_dns_fail=False,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
timeout=20000,
|
||||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
||||
defer.returnValue(content)
|
||||
|
@ -232,6 +264,12 @@ class TransportLayerClient(object):
|
|||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
|
||||
# we want to do our best to send this through. The problem is
|
||||
# that if it fails, we won't retry it later, so if the remote
|
||||
# server was just having a momentary blip, the room will be out of
|
||||
# sync.
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
defer.returnValue(response)
|
||||
|
|
|
@ -1090,19 +1090,13 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
|
||||
try:
|
||||
origin, event = yield self._make_and_verify_event(
|
||||
target_hosts,
|
||||
room_id,
|
||||
user_id,
|
||||
"leave"
|
||||
)
|
||||
event = self._sign_event(event)
|
||||
except SynapseError:
|
||||
raise
|
||||
except CodeMessageException as e:
|
||||
logger.warn("Failed to reject invite: %s", e)
|
||||
raise SynapseError(500, "Failed to reject invite")
|
||||
origin, event = yield self._make_and_verify_event(
|
||||
target_hosts,
|
||||
room_id,
|
||||
user_id,
|
||||
"leave"
|
||||
)
|
||||
event = self._sign_event(event)
|
||||
|
||||
# Try the host that we succesfully called /make_leave/ on first for
|
||||
# the /send_leave/ request.
|
||||
|
@ -1112,16 +1106,10 @@ class FederationHandler(BaseHandler):
|
|||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
yield self.replication_layer.send_leave(
|
||||
target_hosts,
|
||||
event
|
||||
)
|
||||
except SynapseError:
|
||||
raise
|
||||
except CodeMessageException as e:
|
||||
logger.warn("Failed to reject invite: %s", e)
|
||||
raise SynapseError(500, "Failed to reject invite")
|
||||
yield self.replication_layer.send_leave(
|
||||
target_hosts,
|
||||
event
|
||||
)
|
||||
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
|
||||
|
|
|
@ -139,13 +139,6 @@ class RoomMemberHandler(BaseHandler):
|
|||
)
|
||||
yield user_joined_room(self.distributor, user, room_id)
|
||||
|
||||
def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
|
||||
return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
|
||||
remote_room_hosts,
|
||||
room_id,
|
||||
user_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_membership(
|
||||
self,
|
||||
|
@ -286,13 +279,21 @@ class RoomMemberHandler(BaseHandler):
|
|||
else:
|
||||
# send the rejection to the inviter's HS.
|
||||
remote_room_hosts = remote_room_hosts + [inviter.domain]
|
||||
|
||||
fed_handler = self.hs.get_handlers().federation_handler
|
||||
try:
|
||||
ret = yield self.reject_remote_invite(
|
||||
target.to_string(), room_id, remote_room_hosts
|
||||
ret = yield fed_handler.do_remotely_reject_invite(
|
||||
remote_room_hosts,
|
||||
room_id,
|
||||
target.to_string(),
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
except SynapseError as e:
|
||||
except Exception as e:
|
||||
# if we were unable to reject the exception, just mark
|
||||
# it as rejected on our end and plough ahead.
|
||||
#
|
||||
# The 'except' clause is very broad, but we need to
|
||||
# capture everything from DNS failures upwards
|
||||
#
|
||||
logger.warn("Failed to reject invite: %s", e)
|
||||
|
||||
yield self.store.locally_reject_invite(
|
||||
|
|
|
@ -125,6 +125,8 @@ class MatrixFederationHttpClient(object):
|
|||
code >= 300.
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
(May also fail with plenty of other Exceptions for things like DNS
|
||||
failures, connection failures, SSL failures.)
|
||||
"""
|
||||
limiter = yield synapse.util.retryutils.get_retry_limiter(
|
||||
destination,
|
||||
|
@ -302,8 +304,10 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body. On a 4xx or 5xx error response a
|
||||
CodeMessageException is raised.
|
||||
will be the decoded JSON body.
|
||||
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response
|
||||
code >= 300.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
|
@ -360,8 +364,10 @@ class MatrixFederationHttpClient(object):
|
|||
try the request anyway.
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body. On a 4xx or 5xx error response a
|
||||
CodeMessageException is raised.
|
||||
will be the decoded JSON body.
|
||||
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response
|
||||
code >= 300.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
|
@ -410,10 +416,11 @@ class MatrixFederationHttpClient(object):
|
|||
ignore_backoff (bool): true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
Returns:
|
||||
Deferred: Succeeds when we get *any* HTTP response.
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body.
|
||||
|
||||
The result of the deferred is a tuple of `(code, response)`,
|
||||
where `response` is a dict representing the decoded JSON body.
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response
|
||||
code >= 300.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
|
|
|
@ -87,7 +87,11 @@ class BulkPushRuleEvaluator:
|
|||
condition_cache = {}
|
||||
|
||||
for uid, rules in self.rules_by_user.items():
|
||||
display_name = room_members.get(uid, {}).get("display_name", None)
|
||||
display_name = None
|
||||
profile_info = room_members.get(uid)
|
||||
if profile_info:
|
||||
display_name = profile_info.display_name
|
||||
|
||||
if not display_name:
|
||||
# Handle the case where we are pushing a membership event to
|
||||
# that user, as they might not be already joined.
|
||||
|
|
|
@ -102,9 +102,6 @@ class SlavedEventStore(BaseSlavedStore):
|
|||
_get_state_groups_from_groups_txn = (
|
||||
DataStore._get_state_groups_from_groups_txn.__func__
|
||||
)
|
||||
_get_state_group_from_group = (
|
||||
StateStore.__dict__["_get_state_group_from_group"]
|
||||
)
|
||||
get_recent_event_ids_for_room = (
|
||||
StreamStore.__dict__["get_recent_event_ids_for_room"]
|
||||
)
|
||||
|
|
|
@ -406,7 +406,13 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
|
|||
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
||||
|
||||
defer.returnValue((200, {
|
||||
"joined": users_with_profile
|
||||
"joined": {
|
||||
user_id: {
|
||||
"avatar_url": profile.avatar_url,
|
||||
"display_name": profile.display_name,
|
||||
}
|
||||
for user_id, profile in users_with_profile.iteritems()
|
||||
}
|
||||
}))
|
||||
|
||||
|
||||
|
|
|
@ -84,12 +84,11 @@ class LocalKey(Resource):
|
|||
}
|
||||
|
||||
old_verify_keys = {}
|
||||
for key in self.config.old_signing_keys:
|
||||
key_id = "%s:%s" % (key.alg, key.version)
|
||||
for key_id, key in self.config.old_signing_keys.items():
|
||||
verify_key_bytes = key.encode()
|
||||
old_verify_keys[key_id] = {
|
||||
u"key": encode_base64(verify_key_bytes),
|
||||
u"expired_ts": key.expired,
|
||||
u"expired_ts": key.expired_ts,
|
||||
}
|
||||
|
||||
tls_fingerprints = self.config.tls_fingerprints
|
||||
|
|
|
@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
|
|||
# Returns an ObservableDeferred
|
||||
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
|
||||
|
||||
if res and res.called and user_id in res.result:
|
||||
# We'd only be adding to the set, so no point invalidating if the
|
||||
# user is already there
|
||||
return
|
||||
if res:
|
||||
if isinstance(res, defer.Deferred) and res.called:
|
||||
res = res.result
|
||||
if user_id in res:
|
||||
# We'd only be adding to the set, so no point invalidating if the
|
||||
# user is already there
|
||||
return
|
||||
|
||||
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ from collections import namedtuple
|
|||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.stringutils import to_ascii
|
||||
|
||||
from synapse.api.constants import Membership, EventTypes
|
||||
from synapse.types import get_domain_from_id
|
||||
|
@ -35,6 +36,13 @@ RoomsForUser = namedtuple(
|
|||
)
|
||||
|
||||
|
||||
# We store this using a namedtuple so that we save about 3x space over using a
|
||||
# dict.
|
||||
ProfileInfo = namedtuple(
|
||||
"ProfileInfo", ("avatar_url", "display_name")
|
||||
)
|
||||
|
||||
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
||||
|
||||
|
||||
|
@ -422,20 +430,20 @@ class RoomMemberStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
users_in_room = {
|
||||
row["user_id"]: {
|
||||
"display_name": row["display_name"],
|
||||
"avatar_url": row["avatar_url"],
|
||||
}
|
||||
to_ascii(row["user_id"]): ProfileInfo(
|
||||
avatar_url=to_ascii(row["avatar_url"]),
|
||||
display_name=to_ascii(row["display_name"]),
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
|
||||
if event is not None and event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
if event.event_id in member_event_ids:
|
||||
users_in_room[event.state_key] = {
|
||||
"display_name": event.content.get("displayname", None),
|
||||
"avatar_url": event.content.get("avatar_url", None),
|
||||
}
|
||||
users_in_room[to_ascii(event.state_key)] = ProfileInfo(
|
||||
display_name=to_ascii(event.content.get("displayname", None)),
|
||||
avatar_url=to_ascii(event.content.get("avatar_url", None)),
|
||||
)
|
||||
|
||||
defer.returnValue(users_in_room)
|
||||
|
||||
|
|
|
@ -279,12 +279,7 @@ class StateStore(SQLBaseStore):
|
|||
|
||||
return count
|
||||
|
||||
@cached(num_args=2, max_entries=100000, iterable=True)
|
||||
def _get_state_group_from_group(self, group, types):
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(cached_method_name="_get_state_group_from_group",
|
||||
list_name="groups", num_args=2, inlineCallbacks=True)
|
||||
@defer.inlineCallbacks
|
||||
def _get_state_groups_from_groups(self, groups, types):
|
||||
"""Returns dictionary state_group -> (dict of (type, state_key) -> event id)
|
||||
"""
|
||||
|
@ -512,7 +507,7 @@ class StateStore(SQLBaseStore):
|
|||
state_map = yield self.get_state_ids_for_events([event_id], types)
|
||||
defer.returnValue(state_map[event_id])
|
||||
|
||||
@cached(num_args=2, max_entries=100000)
|
||||
@cached(num_args=2, max_entries=50000)
|
||||
def _get_state_group_for_event(self, room_id, event_id):
|
||||
return self._simple_select_one_onecol(
|
||||
table="event_to_state_groups",
|
||||
|
|
|
@ -14,13 +14,10 @@
|
|||
# limitations under the License.
|
||||
|
||||
import synapse.metrics
|
||||
from lrucache import LruCache
|
||||
import os
|
||||
|
||||
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
|
||||
|
||||
DEBUG_CACHES = False
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
|
||||
|
||||
caches_by_name = {}
|
||||
|
@ -40,10 +37,6 @@ def register_cache(name, cache):
|
|||
)
|
||||
|
||||
|
||||
_string_cache = LruCache(int(100000 * CACHE_SIZE_FACTOR))
|
||||
_stirng_cache_metrics = register_cache("string_cache", _string_cache)
|
||||
|
||||
|
||||
KNOWN_KEYS = {
|
||||
key: key for key in
|
||||
(
|
||||
|
@ -67,14 +60,16 @@ KNOWN_KEYS = {
|
|||
|
||||
|
||||
def intern_string(string):
|
||||
"""Takes a (potentially) unicode string and interns using custom cache
|
||||
"""Takes a (potentially) unicode string and interns it if it's ascii
|
||||
"""
|
||||
new_str = _string_cache.setdefault(string, string)
|
||||
if new_str is string:
|
||||
_stirng_cache_metrics.inc_hits()
|
||||
else:
|
||||
_stirng_cache_metrics.inc_misses()
|
||||
return new_str
|
||||
if string is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
string = string.encode("ascii")
|
||||
return intern(string)
|
||||
except UnicodeEncodeError:
|
||||
return string
|
||||
|
||||
|
||||
def intern_dict(dictionary):
|
||||
|
@ -87,13 +82,9 @@ def intern_dict(dictionary):
|
|||
|
||||
|
||||
def _intern_known_values(key, value):
|
||||
intern_str_keys = ("event_id", "room_id")
|
||||
intern_unicode_keys = ("sender", "user_id", "type", "state_key")
|
||||
intern_keys = ("event_id", "room_id", "sender", "user_id", "type", "state_key",)
|
||||
|
||||
if key in intern_str_keys:
|
||||
return intern(value.encode('ascii'))
|
||||
|
||||
if key in intern_unicode_keys:
|
||||
if key in intern_keys:
|
||||
return intern_string(value)
|
||||
|
||||
return value
|
||||
|
|
|
@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
|
|||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
|
||||
from . import DEBUG_CACHES, register_cache
|
||||
from . import register_cache
|
||||
|
||||
from twisted.internet import defer
|
||||
from collections import namedtuple
|
||||
|
@ -76,7 +76,7 @@ class Cache(object):
|
|||
|
||||
self.cache = LruCache(
|
||||
max_size=max_entries, keylen=keylen, cache_type=cache_type,
|
||||
size_callback=(lambda d: len(d.result)) if iterable else None,
|
||||
size_callback=(lambda d: len(d)) if iterable else None,
|
||||
)
|
||||
|
||||
self.name = name
|
||||
|
@ -96,6 +96,17 @@ class Cache(object):
|
|||
)
|
||||
|
||||
def get(self, key, default=_CacheSentinel, callback=None):
|
||||
"""Looks the key up in the caches.
|
||||
|
||||
Args:
|
||||
key(tuple)
|
||||
default: What is returned if key is not in the caches. If not
|
||||
specified then function throws KeyError instead
|
||||
callback(fn): Gets called when the entry in the cache is invalidated
|
||||
|
||||
Returns:
|
||||
Either a Deferred or the raw result
|
||||
"""
|
||||
callbacks = [callback] if callback else []
|
||||
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
||||
if val is not _CacheSentinel:
|
||||
|
@ -137,7 +148,7 @@ class Cache(object):
|
|||
if self.sequence == entry.sequence:
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, entry.deferred, entry.callbacks)
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
entry.invalidate()
|
||||
else:
|
||||
|
@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
|
|||
try:
|
||||
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
|
||||
|
||||
observer = cached_result_d.observe()
|
||||
if DEBUG_CACHES:
|
||||
@defer.inlineCallbacks
|
||||
def check_result(cached_result):
|
||||
actual_result = yield self.function_to_call(obj, *args, **kwargs)
|
||||
if actual_result != cached_result:
|
||||
logger.error(
|
||||
"Stale cache entry %s%r: cached: %r, actual %r",
|
||||
self.orig.__name__, cache_key,
|
||||
cached_result, actual_result,
|
||||
)
|
||||
raise ValueError("Stale cache entry")
|
||||
defer.returnValue(cached_result)
|
||||
observer.addCallback(check_result)
|
||||
if isinstance(cached_result_d, ObservableDeferred):
|
||||
observer = cached_result_d.observe()
|
||||
else:
|
||||
observer = cached_result_d
|
||||
|
||||
except KeyError:
|
||||
ret = defer.maybeDeferred(
|
||||
|
@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
|
|||
|
||||
try:
|
||||
res = cache.get(tuple(key), callback=invalidate_callback)
|
||||
if not res.has_succeeded():
|
||||
if not isinstance(res, ObservableDeferred):
|
||||
results[arg] = res
|
||||
elif not res.has_succeeded():
|
||||
res = res.observe()
|
||||
res.addCallback(lambda r, arg: (arg, r), arg)
|
||||
cached_defers[arg] = res
|
||||
|
|
|
@ -40,3 +40,17 @@ def is_ascii(s):
|
|||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def to_ascii(s):
|
||||
"""Converts a string to ascii if it is ascii, otherwise leave it alone.
|
||||
|
||||
If given None then will return None.
|
||||
"""
|
||||
if s is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return s.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
return s
|
||||
|
|
Loading…
Reference in a new issue