0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 18:53:53 +01:00

Merge branch 'develop' into auth

This commit is contained in:
Daniel Wagner-Hall 2015-08-18 14:43:44 +01:00
commit 1469141023
38 changed files with 1352 additions and 461 deletions

View file

@ -23,7 +23,6 @@ for port in 8080 8081 8082; do
#rm $DIR/etc/$port.config
python -m synapse.app.homeserver \
--generate-config \
--enable_registration \
-H "localhost:$https_port" \
--config-path "$DIR/etc/$port.config" \
@ -36,6 +35,8 @@ for port in 8080 8081 8082; do
fi
fi
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \
-D \

View file

@ -16,3 +16,6 @@ ignore =
docs/*
pylint.cfg
tox.ini
[flake8]
max-line-length = 90

View file

@ -48,7 +48,7 @@ setup(
description="Reference Synapse Home Server",
install_requires=dependencies['requirements'](include_conditional=True).keys(),
setup_requires=[
"Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
"Twisted>=15.1.0", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
"setuptools_trial",
"mock"
],

View file

@ -23,7 +23,7 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util import unwrapFirstError
from synapse.util.expiringcache import ExpiringCache
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
import synapse.metrics
@ -134,6 +134,36 @@ class FederationClient(FederationBase):
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
)
@log_function
def query_client_keys(self, destination, content):
"""Query device keys for a device hosted on a remote server.
Args:
destination (str): Domain name of the remote homeserver
content (dict): The query content.
Returns:
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_device_keys")
return self.transport_layer.query_client_keys(destination, content)
@log_function
def claim_client_keys(self, destination, content):
"""Claims one-time keys for a device hosted on a remote server.
Args:
destination (str): Domain name of the remote homeserver
content (dict): The query content.
Returns:
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_one_time_keys")
return self.transport_layer.claim_client_keys(destination, content)
@defer.inlineCallbacks
@log_function
def backfill(self, dest, context, limit, extremities):

View file

@ -27,6 +27,7 @@ from synapse.api.errors import FederationError, SynapseError
from synapse.crypto.event_signing import compute_event_signature
import simplejson as json
import logging
@ -312,6 +313,48 @@ class FederationServer(FederationBase):
(200, send_content)
)
@defer.inlineCallbacks
@log_function
def on_query_client_keys(self, origin, content):
query = []
for user_id, device_ids in content.get("device_keys", {}).items():
if not device_ids:
query.append((user_id, None))
else:
for device_id in device_ids:
query.append((user_id, device_id))
results = yield self.store.get_e2e_device_keys(query)
json_result = {}
for user_id, device_keys in results.items():
for device_id, json_bytes in device_keys.items():
json_result.setdefault(user_id, {})[device_id] = json.loads(
json_bytes
)
defer.returnValue({"device_keys": json_result})
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
query = []
for user_id, device_keys in content.get("one_time_keys", {}).items():
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
for key_id, json_bytes in keys.items():
json_result.setdefault(user_id, {})[device_id] = {
key_id: json.loads(json_bytes)
}
defer.returnValue({"one_time_keys": json_result})
@defer.inlineCallbacks
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,

View file

@ -222,6 +222,76 @@ class TransportLayerClient(object):
defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def query_client_keys(self, destination, query_content):
"""Query the device keys for a list of user ids hosted on a remote
server.
Request:
{
"device_keys": {
"<user_id>": ["<device_id>"]
} }
Response:
{
"device_keys": {
"<user_id>": {
"<device_id>": {...}
} } }
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the device keys.
"""
path = PREFIX + "/user/keys/query"
content = yield self.client.post_json(
destination=destination,
path=path,
data=query_content,
)
defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def claim_client_keys(self, destination, query_content):
"""Claim one-time keys for a list of devices hosted on a remote server.
Request:
{
"one_time_keys": {
"<user_id>": {
"<device_id>": "<algorithm>"
} } }
Response:
{
"device_keys": {
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": "<key_base64>"
} } } }
Args:
destination(str): The server to query.
query_content(dict): The user ids to query.
Returns:
A dict containg the one-time keys.
"""
path = PREFIX + "/user/keys/claim"
content = yield self.client.post_json(
destination=destination,
path=path,
data=query_content,
)
defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def get_missing_events(self, destination, room_id, earliest_events,

View file

@ -325,6 +325,24 @@ class FederationInviteServlet(BaseFederationServlet):
defer.returnValue((200, content))
class FederationClientKeysQueryServlet(BaseFederationServlet):
PATH = "/user/keys/query"
@defer.inlineCallbacks
def on_POST(self, origin, content, query):
response = yield self.handler.on_query_client_keys(origin, content)
defer.returnValue((200, response))
class FederationClientKeysClaimServlet(BaseFederationServlet):
PATH = "/user/keys/claim"
@defer.inlineCallbacks
def on_POST(self, origin, content, query):
response = yield self.handler.on_claim_client_keys(origin, content)
defer.returnValue((200, response))
class FederationQueryAuthServlet(BaseFederationServlet):
PATH = "/query_auth/([^/]*)/([^/]*)"
@ -373,4 +391,6 @@ SERVLET_CLASSES = (
FederationQueryAuthServlet,
FederationGetMissingEventsServlet,
FederationEventAuthServlet,
FederationClientKeysQueryServlet,
FederationClientKeysClaimServlet,
)

View file

@ -229,15 +229,15 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events],
event_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)
events_and_states = zip(events, states)
def redact_disallowed(event_and_state):
event, state = event_and_state
def redact_disallowed(event, state):
if not state:
return event
@ -271,11 +271,10 @@ class FederationHandler(BaseHandler):
return event
res = map(redact_disallowed, events_and_states)
logger.info("_filter_events_for_server %r", res)
defer.returnValue(res)
defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])
@log_function
@defer.inlineCallbacks
@ -503,7 +502,7 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
states = yield defer.gatherResults([
self.state_handler.resolve_state_groups([e])
self.state_handler.resolve_state_groups(room_id, [e])
for e in event_ids
])
states = dict(zip(event_ids, [s[1] for s in states]))

View file

@ -137,15 +137,15 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events],
event_id_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id),
)
)
events_and_states = zip(events, states)
def allowed(event_and_state):
event, state = event_and_state
def allowed(event, state):
if event.type == EventTypes.RoomHistoryVisibility:
return True
@ -175,10 +175,10 @@ class MessageHandler(BaseHandler):
return True
events_and_states = filter(allowed, events_and_states)
defer.returnValue([
ev
for ev, _ in events_and_states
event
for event in events
if allowed(event, event_id_to_state[event.event_id])
])
@defer.inlineCallbacks
@ -401,8 +401,12 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
# Only do N rooms at once
n = 5
d_list = [handle_room(e) for e in room_list]
for i in range(0, len(d_list), n):
yield defer.gatherResults(
[handle_room(e) for e in room_list],
d_list[i:i + n],
consumeErrors=True
).addErrback(unwrapFirstError)
@ -456,20 +460,14 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
presence_defs = yield defer.DeferredList(
[
presence_handler.get_state(
target_user=UserID.from_string(m.user_id),
states = yield presence_handler.get_states(
target_users=[UserID.from_string(m.user_id) for m in room_members],
auth_user=auth_user,
as_event=True,
check_auth=False,
)
for m in room_members
],
consumeErrors=True,
)
defer.returnValue([p for success, p in presence_defs if success])
defer.returnValue(states.values())
receipts_handler = self.hs.get_handlers().receipts_handler

View file

@ -192,6 +192,20 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def get_state(self, target_user, auth_user, as_event=False, check_auth=True):
"""Get the current presence state of the given user.
Args:
target_user (UserID): The user whose presence we want
auth_user (UserID): The user requesting the presence, used for
checking if said user is allowed to see the persence of the
`target_user`
as_event (bool): Format the return as an event or not?
check_auth (bool): Perform the auth checks or not?
Returns:
dict: The presence state of the `target_user`, whose format depends
on the `as_event` argument.
"""
if self.hs.is_mine(target_user):
if check_auth:
visible = yield self.is_presence_visible(
@ -232,6 +246,81 @@ class PresenceHandler(BaseHandler):
else:
defer.returnValue(state)
@defer.inlineCallbacks
def get_states(self, target_users, auth_user, as_event=False, check_auth=True):
"""A batched version of the `get_state` method that accepts a list of
`target_users`
Args:
target_users (list): The list of UserID's whose presence we want
auth_user (UserID): The user requesting the presence, used for
checking if said user is allowed to see the persence of the
`target_users`
as_event (bool): Format the return as an event or not?
check_auth (bool): Perform the auth checks or not?
Returns:
dict: A mapping from user -> presence_state
"""
local_users, remote_users = partitionbool(
target_users,
lambda u: self.hs.is_mine(u)
)
if check_auth:
for user in local_users:
visible = yield self.is_presence_visible(
observer_user=auth_user,
observed_user=user
)
if not visible:
raise SynapseError(404, "Presence information not visible")
results = {}
if local_users:
for user in local_users:
if user in self._user_cachemap:
results[user] = self._user_cachemap[user].get_state()
local_to_user = {u.localpart: u for u in local_users}
states = yield self.store.get_presence_states(
[u.localpart for u in local_users if u not in results]
)
for local_part, state in states.items():
if state is None:
continue
res = {"presence": state["state"]}
if "status_msg" in state and state["status_msg"]:
res["status_msg"] = state["status_msg"]
results[local_to_user[local_part]] = res
for user in remote_users:
# TODO(paul): Have remote server send us permissions set
results[user] = self._get_or_offline_usercache(user).get_state()
for state in results.values():
if "last_active" in state:
state["last_active_ago"] = int(
self.clock.time_msec() - state.pop("last_active")
)
if as_event:
for user, state in results.items():
content = state
content["user_id"] = user.to_string()
if "last_active" in content:
content["last_active_ago"] = int(
self._clock.time_msec() - content.pop("last_active")
)
results[user] = {"type": "m.presence", "content": content}
defer.returnValue(results)
@defer.inlineCallbacks
@log_function
def set_state(self, target_user, auth_user, state):

View file

@ -294,15 +294,15 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events(
room_id, [e.event_id for e in events],
event_id_to_state = yield self.store.get_state_for_events(
room_id, frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id),
)
)
events_and_states = zip(events, states)
def allowed(event_and_state):
event, state = event_and_state
def allowed(event, state):
if event.type == EventTypes.RoomHistoryVisibility:
return True
@ -331,10 +331,11 @@ class SyncHandler(BaseHandler):
return membership == Membership.INVITE
return True
events_and_states = filter(allowed, events_and_states)
defer.returnValue([
ev
for ev, _ in events_and_states
event
for event in events
if allowed(event, event_id_to_state[event.event_id])
])
@defer.inlineCallbacks

View file

@ -16,7 +16,7 @@
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool
from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter(
)
class MatrixFederationHttpAgent(_AgentBase):
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.tls_context_factory = hs.tls_context_factory
def __init__(self, reactor, pool=None):
_AgentBase.__init__(self, reactor, pool)
def endpointForURI(self, uri):
destination = uri.netloc
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
outgoing_requests_counter.inc(method)
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.tls_context_factory
)
class MatrixFederationHttpClient(object):
@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
self.agent = MatrixFederationHttpAgent(reactor, pool=pool)
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
self.clock = hs.get_clock()
self.version_string = hs.version_string
self._next_id = 1
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urlparse.urlunparse(
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
)
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "",)
url_bytes = self._create_url(
destination, path_bytes, param_bytes, query_bytes
)
txn_id = "%s-O-%s" % (method, self._next_id)
@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object):
# (once we have reliable transactions in place)
retries_left = 5
endpoint = preserve_context_over_fn(
self._getEndpoint, reactor, destination
http_url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "")
)
log_result = None
@ -148,17 +130,14 @@ class MatrixFederationHttpClient(object):
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
producer = body_callback(method, http_url_bytes, headers_dict)
try:
def send_request():
request_deferred = self.agent.request(
destination,
endpoint,
request_deferred = preserve_context_over_fn(
self.agent.request,
method,
path_bytes,
param_bytes,
query_bytes,
url_bytes,
Headers(headers_dict),
producer
)
@ -452,12 +431,6 @@ class MatrixFederationHttpClient(object):
defer.returnValue((length, headers))
def _getEndpoint(self, reactor, destination):
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):

View file

@ -18,8 +18,12 @@ from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
import functools
import os
import stat
import time
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@ -144,3 +148,28 @@ def _process_fds():
return counts
get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
reactor_metrics = get_metrics_for("reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
def runUntilCurrentTimer(func):
@functools.wraps(func)
def f(*args, **kwargs):
pending_calls = len(reactor.getDelayedCalls())
start = time.time() * 1000
ret = func(*args, **kwargs)
end = time.time() * 1000
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(pending_calls)
return ret
return f
if hasattr(reactor, "runUntilCurrent"):
# runUntilCurrent is called when we have pending calls. It is called once
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)

View file

@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = {
"syutil>=0.0.7": ["syutil>=0.0.7"],
"Twisted==14.0.2": ["twisted==14.0.2"],
"Twisted>=15.1.0": ["twisted>=15.1.0"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],

View file

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet
from synapse.types import UserID
from syutil.jsonutil import encode_canonical_json
from ._base import client_v2_pattern
@ -164,45 +165,63 @@ class KeyQueryServlet(RestServlet):
super(KeyQueryServlet, self).__init__()
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.federation = hs.get_replication_layer()
self.is_mine = hs.is_mine
@defer.inlineCallbacks
def on_POST(self, request, user_id, device_id):
logger.debug("onPOST")
yield self.auth.get_user_by_req(request)
try:
body = json.loads(request.content.read())
except:
raise SynapseError(400, "Invalid key JSON")
query = []
for user_id, device_ids in body.get("device_keys", {}).items():
if not device_ids:
query.append((user_id, None))
else:
for device_id in device_ids:
query.append((user_id, device_id))
results = yield self.store.get_e2e_device_keys(query)
defer.returnValue(self.json_result(request, results))
result = yield self.handle_request(body)
defer.returnValue(result)
@defer.inlineCallbacks
def on_GET(self, request, user_id, device_id):
auth_user, client_info = yield self.auth.get_user_by_req(request)
auth_user_id = auth_user.to_string()
if not user_id:
user_id = auth_user_id
if not device_id:
device_id = None
# Returns a map of user_id->device_id->json_bytes.
results = yield self.store.get_e2e_device_keys([(user_id, device_id)])
defer.returnValue(self.json_result(request, results))
user_id = user_id if user_id else auth_user_id
device_ids = [device_id] if device_id else []
result = yield self.handle_request(
{"device_keys": {user_id: device_ids}}
)
defer.returnValue(result)
@defer.inlineCallbacks
def handle_request(self, body):
local_query = []
remote_queries = {}
for user_id, device_ids in body.get("device_keys", {}).items():
user = UserID.from_string(user_id)
if self.is_mine(user):
if not device_ids:
local_query.append((user_id, None))
else:
for device_id in device_ids:
local_query.append((user_id, device_id))
else:
remote_queries.setdefault(user.domain, {})[user_id] = list(
device_ids
)
results = yield self.store.get_e2e_device_keys(local_query)
def json_result(self, request, results):
json_result = {}
for user_id, device_keys in results.items():
for device_id, json_bytes in device_keys.items():
json_result.setdefault(user_id, {})[device_id] = json.loads(
json_bytes
)
return (200, {"device_keys": json_result})
for destination, device_keys in remote_queries.items():
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": device_keys}
)
for user_id, keys in remote_result["device_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
defer.returnValue((200, {"device_keys": json_result}))
class OneTimeKeyServlet(RestServlet):
@ -236,14 +255,16 @@ class OneTimeKeyServlet(RestServlet):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.federation = hs.get_replication_layer()
self.is_mine = hs.is_mine
@defer.inlineCallbacks
def on_GET(self, request, user_id, device_id, algorithm):
yield self.auth.get_user_by_req(request)
results = yield self.store.claim_e2e_one_time_keys(
[(user_id, device_id, algorithm)]
result = yield self.handle_request(
{"one_time_keys": {user_id: {device_id: algorithm}}}
)
defer.returnValue(self.json_result(request, results))
defer.returnValue(result)
@defer.inlineCallbacks
def on_POST(self, request, user_id, device_id, algorithm):
@ -252,14 +273,24 @@ class OneTimeKeyServlet(RestServlet):
body = json.loads(request.content.read())
except:
raise SynapseError(400, "Invalid key JSON")
query = []
for user_id, device_keys in body.get("one_time_keys", {}).items():
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
results = yield self.store.claim_e2e_one_time_keys(query)
defer.returnValue(self.json_result(request, results))
result = yield self.handle_request(body)
defer.returnValue(result)
@defer.inlineCallbacks
def handle_request(self, body):
local_query = []
remote_queries = {}
for user_id, device_keys in body.get("one_time_keys", {}).items():
user = UserID.from_string(user_id)
if self.is_mine(user):
for device_id, algorithm in device_keys.items():
local_query.append((user_id, device_id, algorithm))
else:
remote_queries.setdefault(user.domain, {})[user_id] = (
device_keys
)
results = yield self.store.claim_e2e_one_time_keys(local_query)
def json_result(self, request, results):
json_result = {}
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
@ -267,7 +298,16 @@ class OneTimeKeyServlet(RestServlet):
json_result.setdefault(user_id, {})[device_id] = {
key_id: json.loads(json_bytes)
}
return (200, {"one_time_keys": json_result})
for destination, device_keys in remote_queries.items():
remote_result = yield self.federation.claim_client_keys(
destination, {"one_time_keys": device_keys}
)
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
defer.returnValue((200, {"one_time_keys": json_result}))
def register_servlets(hs, http_server):

View file

@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.expiringcache import ExpiringCache
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.auth import AuthEventTypes
@ -96,7 +96,7 @@ class StateHandler(object):
cache.ts = self.clock.time_msec()
state = cache.state
else:
res = yield self.resolve_state_groups(event_ids)
res = yield self.resolve_state_groups(room_id, event_ids)
state = res[1]
if event_type:
@ -155,13 +155,13 @@ class StateHandler(object):
if event.is_state():
ret = yield self.resolve_state_groups(
[e for e, _ in event.prev_events],
event.room_id, [e for e, _ in event.prev_events],
event_type=event.type,
state_key=event.state_key,
)
else:
ret = yield self.resolve_state_groups(
[e for e, _ in event.prev_events],
event.room_id, [e for e, _ in event.prev_events],
)
group, curr_state, prev_state = ret
@ -180,7 +180,7 @@ class StateHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(self, event_ids, event_type=None, state_key=""):
def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""):
""" Given a list of event_ids this method fetches the state at each
event, resolves conflicts between them and returns them.
@ -205,7 +205,7 @@ class StateHandler(object):
)
state_groups = yield self.store.get_state_groups(
event_ids
room_id, event_ids
)
logger.debug(

View file

@ -15,25 +15,22 @@
import logging
from synapse.api.errors import StoreError
from synapse.util.async import ObservableDeferred
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
from synapse.util.lrucache import LruCache
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
import synapse.metrics
from util.id_generators import IdGenerator, StreamIdGenerator
from twisted.internet import defer
from collections import namedtuple, OrderedDict
from collections import namedtuple
import functools
import inspect
import sys
import time
import threading
DEBUG_CACHES = False
logger = logging.getLogger(__name__)
@ -49,208 +46,6 @@ sql_scheduling_timer = metrics.register_distribution("schedule_time")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
caches_by_name = {}
cache_counter = metrics.register_cache(
"cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
_CacheSentinel = object()
class Cache(object):
def __init__(self, name, max_entries=1000, keylen=1, lru=True):
if lru:
self.cache = LruCache(max_size=max_entries)
self.max_entries = None
else:
self.cache = OrderedDict()
self.max_entries = max_entries
self.name = name
self.keylen = keylen
self.sequence = 0
self.thread = None
caches_by_name[name] = self.cache
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
self.thread = threading.current_thread()
else:
if expected_thread is not threading.current_thread():
raise ValueError(
"Cache objects can only be accessed from the main thread"
)
def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
cache_counter.inc_hits(self.name)
return val
cache_counter.inc_misses(self.name)
if default is _CacheSentinel:
raise KeyError()
else:
return default
def update(self, sequence, key, value):
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
self.prefill(key, value)
def prefill(self, key, value):
if self.max_entries is not None:
while len(self.cache) >= self.max_entries:
self.cache.popitem(last=False)
self.cache[key] = value
def invalidate(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
self.cache.pop(key, None)
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
class CacheDescriptor(object):
""" A method decorator that applies a memoizing cache around the function.
This caches deferreds, rather than the results themselves. Deferreds that
fail are removed from the cache.
The function is presumed to take zero or more arguments, which are used in
a tuple as the key for the cache. Hits are served directly from the cache;
misses use the function body to generate the value.
The wrapped function has an additional member, a callable called
"invalidate". This can be used to remove individual entries from the cache.
The wrapped function has another additional callable, called "prefill",
which can be used to insert values into the cache specifically, without
calling the calculation function.
"""
def __init__(self, orig, max_entries=1000, num_args=1, lru=True,
inlineCallbacks=False):
self.orig = orig
if inlineCallbacks:
self.function_to_call = defer.inlineCallbacks(orig)
else:
self.function_to_call = orig
self.max_entries = max_entries
self.num_args = num_args
self.lru = lru
self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
if len(self.arg_names) < self.num_args:
raise Exception(
"Not enough explicit positional arguments to key off of for %r."
" (@cached cannot key off of *args or **kwars)"
% (orig.__name__,)
)
self.cache = Cache(
name=self.orig.__name__,
max_entries=self.max_entries,
keylen=self.num_args,
lru=self.lru,
)
def __get__(self, obj, objtype=None):
@functools.wraps(self.orig)
def wrapped(*args, **kwargs):
arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
try:
cached_result_d = self.cache.get(cache_key)
observer = cached_result_d.observe()
if DEBUG_CACHES:
@defer.inlineCallbacks
def check_result(cached_result):
actual_result = yield self.function_to_call(obj, *args, **kwargs)
if actual_result != cached_result:
logger.error(
"Stale cache entry %s%r: cached: %r, actual %r",
self.orig.__name__, cache_key,
cached_result, actual_result,
)
raise ValueError("Stale cache entry")
defer.returnValue(cached_result)
observer.addCallback(check_result)
return observer
except KeyError:
# Get the sequence number of the cache before reading from the
# database so that we can tell if the cache is invalidated
# while the SELECT is executing (SYN-369)
sequence = self.cache.sequence
ret = defer.maybeDeferred(
self.function_to_call,
obj, *args, **kwargs
)
def onErr(f):
self.cache.invalidate(cache_key)
return f
ret.addErrback(onErr)
ret = ObservableDeferred(ret, consumeErrors=True)
self.cache.update(sequence, cache_key, ret)
return ret.observe()
wrapped.invalidate = self.cache.invalidate
wrapped.invalidate_all = self.cache.invalidate_all
wrapped.prefill = self.cache.prefill
obj.__dict__[self.orig.__name__] = wrapped
return wrapped
def cached(max_entries=1000, num_args=1, lru=True):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru,
inlineCallbacks=True,
)
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
@ -372,6 +167,8 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000)
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.errors import SynapseError

View file

@ -15,7 +15,8 @@
from twisted.internet import defer
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from syutil.base64util import encode_base64
import logging

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from _base import SQLBaseStore, cachedInlineCallbacks
from _base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer

View file

@ -13,19 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
from twisted.internet import defer
class PresenceStore(SQLBaseStore):
def create_presence(self, user_localpart):
return self._simple_insert(
res = self._simple_insert(
table="presence",
values={"user_id": user_localpart},
desc="create_presence",
)
self.get_presence_state.invalidate((user_localpart,))
return res
def has_presence_state(self, user_localpart):
return self._simple_select_one(
table="presence",
@ -35,6 +39,7 @@ class PresenceStore(SQLBaseStore):
desc="has_presence_state",
)
@cached()
def get_presence_state(self, user_localpart):
return self._simple_select_one(
table="presence",
@ -43,8 +48,27 @@ class PresenceStore(SQLBaseStore):
desc="get_presence_state",
)
@cachedList(get_presence_state.cache, list_name="user_localparts")
def get_presence_states(self, user_localparts):
def f(txn):
results = {}
for user_localpart in user_localparts:
res = self._simple_select_one_txn(
txn,
table="presence",
keyvalues={"user_id": user_localpart},
retcols=["state", "status_msg", "mtime"],
allow_none=True,
)
if res:
results[user_localpart] = res
return results
return self.runInteraction("get_presence_states", f)
def set_presence_state(self, user_localpart, new_state):
return self._simple_update_one(
res = self._simple_update_one(
table="presence",
keyvalues={"user_id": user_localpart},
updatevalues={"state": new_state["state"],
@ -53,6 +77,9 @@ class PresenceStore(SQLBaseStore):
desc="set_presence_state",
)
self.get_presence_state.invalidate((user_localpart,))
return res
def allow_presence_visible(self, observed_localpart, observer_userid):
return self._simple_insert(
table="presence_allow_inbound",

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cachedInlineCallbacks
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer
import logging

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cachedInlineCallbacks
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer

View file

@ -17,7 +17,8 @@ from twisted.internet import defer
from synapse.api.errors import StoreError, Codes
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
class RegistrationStore(SQLBaseStore):

View file

@ -17,7 +17,8 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from ._base import SQLBaseStore, cachedInlineCallbacks
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
import collections
import logging

View file

@ -17,7 +17,8 @@ from twisted.internet import defer
from collections import namedtuple
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.constants import Membership
from synapse.types import UserID

View file

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cachedInlineCallbacks
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import (
cached, cachedInlineCallbacks, cachedList
)
from twisted.internet import defer
@ -44,59 +47,25 @@ class StateStore(SQLBaseStore):
"""
@defer.inlineCallbacks
def get_state_groups(self, event_ids):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
The return value is a dict mapping group names to lists of events.
"""
if not event_ids:
defer.returnValue({})
def f(txn):
groups = set()
for event_id in event_ids:
group = self._simple_select_one_onecol_txn(
txn,
table="event_to_state_groups",
keyvalues={"event_id": event_id},
retcol="state_group",
allow_none=True,
)
if group:
groups.add(group)
res = {}
for group in groups:
state_ids = self._simple_select_onecol_txn(
txn,
table="state_groups_state",
keyvalues={"state_group": group},
retcol="event_id",
event_to_groups = yield self._get_state_group_for_events(
room_id, event_ids,
)
res[group] = state_ids
groups = set(event_to_groups.values())
group_to_state = yield self._get_state_for_groups(groups)
return res
states = yield self.runInteraction(
"get_state_groups",
f,
)
state_list = yield defer.gatherResults(
[
self._fetch_events_for_group(group, vals)
for group, vals in states.items()
],
consumeErrors=True,
)
defer.returnValue(dict(state_list))
def _fetch_events_for_group(self, key, events):
return self._get_events(
events, get_prev_content=False
).addCallback(
lambda evs: (key, evs)
)
defer.returnValue({
group: state_map.values()
for group, state_map in group_to_state.items()
})
def _store_state_groups_txn(self, txn, event, context):
return self._store_mult_state_groups_txn(txn, [(event, context)])
@ -204,64 +173,251 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
@defer.inlineCallbacks
def get_state_for_events(self, room_id, event_ids):
def _get_state_groups_from_groups(self, groups_and_types):
"""Returns dictionary state_group -> state event ids
Args:
groups_and_types (list): list of 2-tuple (`group`, `types`)
"""
def f(txn):
groups = set()
event_to_group = {}
for event_id in event_ids:
# TODO: Remove this loop.
group = self._simple_select_one_onecol_txn(
txn,
table="event_to_state_groups",
keyvalues={"event_id": event_id},
retcol="state_group",
allow_none=True,
results = {}
for group, types in groups_and_types:
if types is not None:
where_clause = "AND (%s)" % (
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
)
if group:
event_to_group[event_id] = group
groups.add(group)
else:
where_clause = ""
group_to_state_ids = {}
for group in groups:
state_ids = self._simple_select_onecol_txn(
txn,
table="state_groups_state",
keyvalues={"state_group": group},
retcol="event_id",
)
sql = (
"SELECT event_id FROM state_groups_state WHERE"
" state_group = ? %s"
) % (where_clause,)
group_to_state_ids[group] = state_ids
args = [group]
if types is not None:
args.extend([i for typ in types for i in typ])
return event_to_group, group_to_state_ids
txn.execute(sql, args)
res = yield self.runInteraction(
"annotate_events_with_state_groups",
results[group] = [r[0] for r in txn.fetchall()]
return results
return self.runInteraction(
"_get_state_groups_from_groups",
f,
)
event_to_group, group_to_state_ids = res
@defer.inlineCallbacks
def get_state_for_events(self, room_id, event_ids, types):
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list.
state_list = yield defer.gatherResults(
[
self._fetch_events_for_group(group, vals)
for group, vals in group_to_state_ids.items()
],
consumeErrors=True,
Args:
room_id (str)
event_ids (list)
types (list): List of (type, state_key) tuples which are used to
filter the state fetched. `state_key` may be None, which matches
any `state_key`
Returns:
deferred: A list of dicts corresponding to the event_ids given.
The dicts are mappings from (type, state_key) -> state_events
"""
event_to_groups = yield self._get_state_group_for_events(
room_id, event_ids,
)
state_dict = {
group: {
(ev.type, ev.state_key): ev
for ev in state
}
for group, state in state_list
groups = set(event_to_groups.values())
group_to_state = yield self._get_state_for_groups(groups, types)
event_to_state = {
event_id: group_to_state[group]
for event_id, group in event_to_groups.items()
}
defer.returnValue([
state_dict.get(event_to_group.get(event, None), None)
for event in event_ids
])
defer.returnValue({event: event_to_state[event] for event in event_ids})
@cached(num_args=2, lru=True, max_entries=10000)
def _get_state_group_for_event(self, room_id, event_id):
return self._simple_select_one_onecol(
table="event_to_state_groups",
keyvalues={
"event_id": event_id,
},
retcol="state_group",
allow_none=True,
desc="_get_state_group_for_event",
)
@cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
num_args=2)
def _get_state_group_for_events(self, room_id, event_ids):
"""Returns mapping event_id -> state_group
"""
def f(txn):
results = {}
for event_id in event_ids:
results[event_id] = self._simple_select_one_onecol_txn(
txn,
table="event_to_state_groups",
keyvalues={
"event_id": event_id,
},
retcol="state_group",
allow_none=True,
)
return results
return self.runInteraction("_get_state_group_for_events", f)
def _get_some_state_from_cache(self, group, types):
"""Checks if group is in cache. See `_get_state_for_groups`
Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
`missing_types` is the list of types that aren't in the cache for that
group. `got_all` is a bool indicating if we successfully retrieved all
requests state from the cache, if False we need to query the DB for the
missing state.
Args:
group: The state group to lookup
types (list): List of 2-tuples of the form (`type`, `state_key`),
where a `state_key` of `None` matches all state_keys for the
`type`.
"""
is_all, state_dict = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
for typ, state_key in types:
if state_key is None:
type_to_key[typ] = None
missing_types.add((typ, state_key))
else:
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
if (typ, state_key) not in state_dict:
missing_types.add((typ, state_key))
sentinel = object()
def include(typ, state_key):
valid_state_keys = type_to_key.get(typ, sentinel)
if valid_state_keys is sentinel:
return False
if valid_state_keys is None:
return True
if state_key in valid_state_keys:
return True
return False
got_all = not (missing_types or types is None)
return {
k: v for k, v in state_dict.items()
if include(k[0], k[1])
}, missing_types, got_all
def _get_all_state_from_cache(self, group):
"""Checks if group is in cache. See `_get_state_for_groups`
Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool
indicating if we successfully retrieved all requests state from the
cache, if False we need to query the DB for the missing state.
Args:
group: The state group to lookup
"""
is_all, state_dict = self._state_group_cache.get(group)
return state_dict, is_all
@defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None):
"""Given list of groups returns dict of group -> list of state events
with matching types. `types` is a list of `(type, state_key)`, where
a `state_key` of None matches all state_keys. If `types` is None then
all events are returned.
"""
results = {}
missing_groups_and_types = []
if types is not None:
for group in set(groups):
state_dict, missing_types, got_all = self._get_some_state_from_cache(
group, types
)
results[group] = state_dict
if not got_all:
missing_groups_and_types.append((group, missing_types))
else:
for group in set(groups):
state_dict, got_all = self._get_all_state_from_cache(
group
)
results[group] = state_dict
if not got_all:
missing_groups_and_types.append((group, None))
if not missing_groups_and_types:
defer.returnValue({
group: {
type_tuple: event
for type_tuple, event in state.items()
if event
}
for group, state in results.items()
})
# Okay, so we have some missing_types, lets fetch them.
cache_seq_num = self._state_group_cache.sequence
group_state_dict = yield self._get_state_groups_from_groups(
missing_groups_and_types
)
state_events = yield self._get_events(
[e_id for l in group_state_dict.values() for e_id in l],
get_prev_content=False
)
state_events = {e.event_id: e for e in state_events}
# Now we want to update the cache with all the things we fetched
# from the database.
for group, state_ids in group_state_dict.items():
if types:
# We delibrately put key -> None mappings into the cache to
# cache absence of the key, on the assumption that if we've
# explicitly asked for some types then we will probably ask
# for them again.
state_dict = {key: None for key in types}
state_dict.update(results[group])
else:
state_dict = results[group]
for event_id in state_ids:
state_event = state_events[event_id]
state_dict[(state_event.type, state_event.state_key)] = state_event
self._state_group_cache.update(
cache_seq_num,
key=group,
value=state_dict,
full=(types is None),
)
# We replace here to remove all the entries with None values.
results[group] = {
key: value for key, value in state_dict.items() if value
}
defer.returnValue(results)
def _make_group_id(clock):

View file

@ -36,6 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@ -299,9 +300,8 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token))
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token,
with_feedback=False, from_token=None):
@cachedInlineCallbacks(num_args=4)
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
# TODO (erikj): Handle compressed feedback
end_token = RoomStreamToken.parse_stream_token(end_token)

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from collections import namedtuple

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.metrics
DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}
cache_counter = metrics.register_cache(
"cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)

View file

@ -0,0 +1,377 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError
from synapse.util.caches.lrucache import LruCache
from . import caches_by_name, DEBUG_CACHES, cache_counter
from twisted.internet import defer
from collections import OrderedDict
import functools
import inspect
import threading
logger = logging.getLogger(__name__)
_CacheSentinel = object()
class Cache(object):
def __init__(self, name, max_entries=1000, keylen=1, lru=True):
if lru:
self.cache = LruCache(max_size=max_entries)
self.max_entries = None
else:
self.cache = OrderedDict()
self.max_entries = max_entries
self.name = name
self.keylen = keylen
self.sequence = 0
self.thread = None
caches_by_name[name] = self.cache
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
self.thread = threading.current_thread()
else:
if expected_thread is not threading.current_thread():
raise ValueError(
"Cache objects can only be accessed from the main thread"
)
def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
cache_counter.inc_hits(self.name)
return val
cache_counter.inc_misses(self.name)
if default is _CacheSentinel:
raise KeyError()
else:
return default
def update(self, sequence, key, value):
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
self.prefill(key, value)
def prefill(self, key, value):
if self.max_entries is not None:
while len(self.cache) >= self.max_entries:
self.cache.popitem(last=False)
self.cache[key] = value
def invalidate(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
self.cache.pop(key, None)
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
class CacheDescriptor(object):
""" A method decorator that applies a memoizing cache around the function.
This caches deferreds, rather than the results themselves. Deferreds that
fail are removed from the cache.
The function is presumed to take zero or more arguments, which are used in
a tuple as the key for the cache. Hits are served directly from the cache;
misses use the function body to generate the value.
The wrapped function has an additional member, a callable called
"invalidate". This can be used to remove individual entries from the cache.
The wrapped function has another additional callable, called "prefill",
which can be used to insert values into the cache specifically, without
calling the calculation function.
"""
def __init__(self, orig, max_entries=1000, num_args=1, lru=True,
inlineCallbacks=False):
self.orig = orig
if inlineCallbacks:
self.function_to_call = defer.inlineCallbacks(orig)
else:
self.function_to_call = orig
self.max_entries = max_entries
self.num_args = num_args
self.lru = lru
self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
if len(self.arg_names) < self.num_args:
raise Exception(
"Not enough explicit positional arguments to key off of for %r."
" (@cached cannot key off of *args or **kwars)"
% (orig.__name__,)
)
self.cache = Cache(
name=self.orig.__name__,
max_entries=self.max_entries,
keylen=self.num_args,
lru=self.lru,
)
def __get__(self, obj, objtype=None):
@functools.wraps(self.orig)
def wrapped(*args, **kwargs):
arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
try:
cached_result_d = self.cache.get(cache_key)
observer = cached_result_d.observe()
if DEBUG_CACHES:
@defer.inlineCallbacks
def check_result(cached_result):
actual_result = yield self.function_to_call(obj, *args, **kwargs)
if actual_result != cached_result:
logger.error(
"Stale cache entry %s%r: cached: %r, actual %r",
self.orig.__name__, cache_key,
cached_result, actual_result,
)
raise ValueError("Stale cache entry")
defer.returnValue(cached_result)
observer.addCallback(check_result)
return observer
except KeyError:
# Get the sequence number of the cache before reading from the
# database so that we can tell if the cache is invalidated
# while the SELECT is executing (SYN-369)
sequence = self.cache.sequence
ret = defer.maybeDeferred(
self.function_to_call,
obj, *args, **kwargs
)
def onErr(f):
self.cache.invalidate(cache_key)
return f
ret.addErrback(onErr)
ret = ObservableDeferred(ret, consumeErrors=True)
self.cache.update(sequence, cache_key, ret)
return ret.observe()
wrapped.invalidate = self.cache.invalidate
wrapped.invalidate_all = self.cache.invalidate_all
wrapped.prefill = self.cache.prefill
obj.__dict__[self.orig.__name__] = wrapped
return wrapped
class CacheListDescriptor(object):
"""Wraps an existing cache to support bulk fetching of keys.
Given a list of keys it looks in the cache to find any hits, then passes
the list of missing keys to the wrapped fucntion.
"""
def __init__(self, orig, cache, list_name, num_args=1, inlineCallbacks=False):
"""
Args:
orig (function)
cache (Cache)
list_name (str): Name of the argument which is the bulk lookup list
num_args (int)
inlineCallbacks (bool): Whether orig is a generator that should
be wrapped by defer.inlineCallbacks
"""
self.orig = orig
if inlineCallbacks:
self.function_to_call = defer.inlineCallbacks(orig)
else:
self.function_to_call = orig
self.num_args = num_args
self.list_name = list_name
self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
self.list_pos = self.arg_names.index(self.list_name)
self.cache = cache
self.sentinel = object()
if len(self.arg_names) < self.num_args:
raise Exception(
"Not enough explicit positional arguments to key off of for %r."
" (@cached cannot key off of *args or **kwars)"
% (orig.__name__,)
)
if self.list_name not in self.arg_names:
raise Exception(
"Couldn't see arguments %r for %r."
% (self.list_name, cache.name,)
)
def __get__(self, obj, objtype=None):
@functools.wraps(self.orig)
def wrapped(*args, **kwargs):
arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
list_args = arg_dict[self.list_name]
# cached is a dict arg -> deferred, where deferred results in a
# 2-tuple (`arg`, `result`)
cached = {}
missing = []
for arg in list_args:
key = list(keyargs)
key[self.list_pos] = arg
try:
res = self.cache.get(tuple(key)).observe()
res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res
except KeyError:
missing.append(arg)
if missing:
sequence = self.cache.sequence
args_to_call = dict(arg_dict)
args_to_call[self.list_name] = missing
ret_d = defer.maybeDeferred(
self.function_to_call,
**args_to_call
)
ret_d = ObservableDeferred(ret_d)
# We need to create deferreds for each arg in the list so that
# we can insert the new deferred into the cache.
for arg in missing:
observer = ret_d.observe()
observer.addCallback(lambda r, arg: r.get(arg, None), arg)
observer = ObservableDeferred(observer)
key = list(keyargs)
key[self.list_pos] = arg
self.cache.update(sequence, tuple(key), observer)
def invalidate(f, key):
self.cache.invalidate(key)
return f
observer.addErrback(invalidate, tuple(key))
res = observer.observe()
res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res
return defer.gatherResults(
cached.values(),
consumeErrors=True,
).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))
obj.__dict__[self.orig.__name__] = wrapped
return wrapped
def cached(max_entries=1000, num_args=1, lru=True):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru
)
def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
lru=lru,
inlineCallbacks=True,
)
def cachedList(cache, list_name, num_args=1, inlineCallbacks=False):
"""Creates a descriptor that wraps a function in a `CacheListDescriptor`.
Used to do batch lookups for an already created cache. A single argument
is specified as a list that is iterated through to lookup keys in the
original cache. A new list consisting of the keys that weren't in the cache
get passed to the original function, the result of which is stored in the
cache.
Args:
cache (Cache): The underlying cache to use.
list_name (str): The name of the argument that is the list to use to
do batch lookups in the cache.
num_args (int): Number of arguments to use as the key in the cache.
inlineCallbacks (bool): Should the function be wrapped in an
`defer.inlineCallbacks`?
Example:
class Example(object):
@cached(num_args=2)
def do_something(self, first_arg):
...
@cachedList(do_something.cache, list_name="second_args", num_args=2)
def batch_do_something(self, first_arg, second_args):
...
"""
return lambda orig: CacheListDescriptor(
orig,
cache=cache,
list_name=list_name,
num_args=num_args,
inlineCallbacks=inlineCallbacks,
)

View file

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches.lrucache import LruCache
from collections import namedtuple
from . import caches_by_name, cache_counter
import threading
import logging
logger = logging.getLogger(__name__)
DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value"))
class DictionaryCache(object):
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
fetching a subset of dictionary keys for a particular key.
"""
def __init__(self, name, max_entries=1000):
self.cache = LruCache(max_size=max_entries)
self.name = name
self.sequence = 0
self.thread = None
# caches_by_name[name] = self.cache
class Sentinel(object):
__slots__ = []
self.sentinel = Sentinel()
caches_by_name[name] = self.cache
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
self.thread = threading.current_thread()
else:
if expected_thread is not threading.current_thread():
raise ValueError(
"Cache objects can only be accessed from the main thread"
)
def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel:
cache_counter.inc_hits(self.name)
if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value))
else:
return DictionaryEntry(entry.full, {
k: entry.value[k]
for k in dict_keys
if k in entry.value
})
cache_counter.inc_misses(self.name)
return DictionaryEntry(False, {})
def invalidate(self, key):
self.check_thread()
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
self.cache.pop(key, None)
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
def update(self, sequence, key, value, full=False):
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
if full:
self._insert(key, value)
else:
self._update_or_insert(key, value)
def _update_or_insert(self, key, value):
entry = self.cache.setdefault(key, DictionaryEntry(False, {}))
entry.value.update(value)
def _insert(self, key, value):
self.cache[key] = DictionaryEntry(True, value)

View file

@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.storage._base import Cache, cached
from synapse.util.caches.descriptors import Cache, cached
class CacheTestCase(unittest.TestCase):

View file

@ -69,7 +69,7 @@ class StateGroupStore(object):
self._next_group = 1
def get_state_groups(self, event_ids):
def get_state_groups(self, room_id, event_ids):
groups = {}
for event_id in event_ids:
group = self._event_to_state_group.get(event_id)

View file

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from tests import unittest
from synapse.util.caches.dictionary_cache import DictionaryCache
class DictCacheTestCase(unittest.TestCase):
def setUp(self):
self.cache = DictionaryCache("foobar")
def test_simple_cache_hit_full(self):
key = "test_simple_cache_hit_full"
v = self.cache.get(key)
self.assertEqual((False, {}), v)
seq = self.cache.sequence
test_value = {"test": "test_simple_cache_hit_full"}
self.cache.update(seq, key, test_value, full=True)
c = self.cache.get(key)
self.assertEqual(test_value, c.value)
def test_simple_cache_hit_partial(self):
key = "test_simple_cache_hit_partial"
seq = self.cache.sequence
test_value = {
"test": "test_simple_cache_hit_partial"
}
self.cache.update(seq, key, test_value, full=True)
c = self.cache.get(key, ["test"])
self.assertEqual(test_value, c.value)
def test_simple_cache_miss_partial(self):
key = "test_simple_cache_miss_partial"
seq = self.cache.sequence
test_value = {
"test": "test_simple_cache_miss_partial"
}
self.cache.update(seq, key, test_value, full=True)
c = self.cache.get(key, ["test2"])
self.assertEqual({}, c.value)
def test_simple_cache_hit_miss_partial(self):
key = "test_simple_cache_hit_miss_partial"
seq = self.cache.sequence
test_value = {
"test": "test_simple_cache_hit_miss_partial",
"test2": "test_simple_cache_hit_miss_partial2",
"test3": "test_simple_cache_hit_miss_partial3",
}
self.cache.update(seq, key, test_value, full=True)
c = self.cache.get(key, ["test2"])
self.assertEqual({"test2": "test_simple_cache_hit_miss_partial2"}, c.value)
def test_multi_insert(self):
key = "test_simple_cache_hit_miss_partial"
seq = self.cache.sequence
test_value_1 = {
"test": "test_simple_cache_hit_miss_partial",
}
self.cache.update(seq, key, test_value_1, full=False)
seq = self.cache.sequence
test_value_2 = {
"test2": "test_simple_cache_hit_miss_partial2",
}
self.cache.update(seq, key, test_value_2, full=False)
c = self.cache.get(key)
self.assertEqual(
{
"test": "test_simple_cache_hit_miss_partial",
"test2": "test_simple_cache_hit_miss_partial2",
},
c.value
)

View file

@ -16,7 +16,7 @@
from .. import unittest
from synapse.util.lrucache import LruCache
from synapse.util.caches.lrucache import LruCache
class LruCacheTestCase(unittest.TestCase):
@ -52,5 +52,3 @@ class LruCacheTestCase(unittest.TestCase):
cache["key"] = 1
self.assertEquals(cache.pop("key"), 1)
self.assertEquals(cache.pop("key"), None)