0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-06-20 11:38:20 +02:00

Merge branch 'develop' into matthew/filter_members

This commit is contained in:
Matthew Hodgson 2018-06-10 12:26:14 +03:00
commit c96d882a02
28 changed files with 532 additions and 179 deletions

1
.gitignore vendored
View file

@ -43,6 +43,7 @@ media_store/
build/
venv/
venv*/
localhost-800*/
static/client/register/register_config.js

View file

@ -1,3 +1,72 @@
Changes in synapse v0.31.1 (2018-06-08)
=======================================
v0.31.1 fixes a security bug in the ``get_missing_events`` federation API
where event visibility rules were not applied correctly.
We are not aware of it being actively exploited but please upgrade asap.
Bug Fixes:
* Fix event filtering in get_missing_events handler (PR #3371)
Changes in synapse v0.31.0 (2018-06-06)
=======================================
Most notable change from v0.30.0 is to switch to the python prometheus library to improve system
stats reporting. WARNING: this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
Bug Fixes:
* Fix metric documentation tables (PR #3341)
* Fix LaterGauge error handling (694968f)
* Fix replication metrics (b7e7fd2)
Changes in synapse v0.31.0-rc1 (2018-06-04)
==========================================
Features:
* Switch to the Python Prometheus library (PR #3256, #3274)
* Let users leave the server notice room after joining (PR #3287)
Changes:
* daily user type phone home stats (PR #3264)
* Use iter* methods for _filter_events_for_server (PR #3267)
* Docs on consent bits (PR #3268)
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)
* Put python's logs into Trial when running unit tests (PR #3319)
Changes, python 3 migration:
* Replace some more comparisons with six (PR #3243) Thanks to @NotAFile!
* replace some iteritems with six (PR #3244) Thanks to @NotAFile!
* Add batch_iter to utils (PR #3245) Thanks to @NotAFile!
* use repr, not str (PR #3246) Thanks to @NotAFile!
* Misc Python3 fixes (PR #3247) Thanks to @NotAFile!
* Py3 storage/_base.py (PR #3278) Thanks to @NotAFile!
* more six iteritems (PR #3279) Thanks to @NotAFile!
* More Misc. py3 fixes (PR #3280) Thanks to @NotAFile!
* remaining isintance fixes (PR #3281) Thanks to @NotAFile!
* py3-ize state.py (PR #3283) Thanks to @NotAFile!
* extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel!
* use memoryview in py3 (PR #3303) Thanks to @NotAFile!
Bugs:
* Fix federation backfill bugs (PR #3261)
* federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
Changes in synapse v0.30.0 (2018-05-24)
==========================================

View file

@ -63,30 +63,40 @@ The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
All time duration-based metrics have been changed to be seconds. This affects:
================================
msec -> sec metrics
================================
python_gc_time
python_twisted_reactor_tick_time
synapse_storage_query_time
synapse_storage_schedule_time
synapse_storage_transaction_time
================================
+----------------------------------+
| msec -> sec metrics |
+==================================+
| python_gc_time |
+----------------------------------+
| python_twisted_reactor_tick_time |
+----------------------------------+
| synapse_storage_query_time |
+----------------------------------+
| synapse_storage_schedule_time |
+----------------------------------+
| synapse_storage_transaction_time |
+----------------------------------+
Several metrics have been changed to be histograms, which sort entries into
buckets and allow better analysis. The following metrics are now histograms:
=========================================
Altered metrics
=========================================
python_gc_time
python_twisted_reactor_pending_calls
python_twisted_reactor_tick_time
synapse_http_server_response_time_seconds
synapse_storage_query_time
synapse_storage_schedule_time
synapse_storage_transaction_time
=========================================
+-------------------------------------------+
| Altered metrics |
+===========================================+
| python_gc_time |
+-------------------------------------------+
| python_twisted_reactor_pending_calls |
+-------------------------------------------+
| python_twisted_reactor_tick_time |
+-------------------------------------------+
| synapse_http_server_response_time_seconds |
+-------------------------------------------+
| synapse_storage_query_time |
+-------------------------------------------+
| synapse_storage_schedule_time |
+-------------------------------------------+
| synapse_storage_transaction_time |
+-------------------------------------------+
Block and response metrics renamed for 0.27.0

View file

@ -9,19 +9,19 @@ Set up database
Assuming your PostgreSQL database user is called ``postgres``, create a user
``synapse_user`` with::
su - postgres
createuser --pwprompt synapse_user
su - postgres
createuser --pwprompt synapse_user
The PostgreSQL database used *must* have the correct encoding set, otherwise it
would not be able to store UTF8 strings. To create a database with the correct
encoding use, e.g.::
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
CREATE DATABASE synapse
ENCODING 'UTF8'
LC_COLLATE='C'
LC_CTYPE='C'
template=template0
OWNER synapse_user;
This would create an appropriate database named ``synapse`` owned by the
``synapse_user`` user (which must already exist).
@ -126,7 +126,7 @@ run::
--postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file ``homeserver-postgres.yaml``:
database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml

View file

@ -17,4 +17,5 @@ ignore =
[flake8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
ignore = W503
# E203 is contrary to PEP8.
ignore = W503,E203

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.30.0"
__version__ = "0.31.1"

View file

@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False

View file

@ -292,4 +292,8 @@ class ApplicationService(object):
return self.rate_limited
def __str__(self):
return "ApplicationService: %s" % (self.__dict__,)
# copy dictionary and redact token fields so they don't get logged
dict_copy = self.__dict__.copy()
dict_copy["token"] = "<redacted>"
dict_copy["hs_token"] = "<redacted>"
return "ApplicationService: %s" % (dict_copy,)

View file

@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
import logging
import urllib
from prometheus_client import Counter
logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"]
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"]
)
sent_events_counter = Counter(
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
["service"]
)
HOUR_IN_MS = 60 * 60 * 1000
@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
defer.returnValue(True)
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("push_bulk to %s threw exception %s", uri, ex)
failed_transactions_counter.labels(service.id).inc()
defer.returnValue(False)
def _serialize(self, events):

View file

@ -18,6 +18,9 @@ from ._base import Config
DEFAULT_CONFIG = """\
# User Consent configuration
#
# for detailed instructions, see
# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
#
# Parts of this section are required if enabling the 'consent' resource under
# 'listeners', in particular 'template_dir' and 'version'.
#

View file

@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
verify_signed_json, signature_ids, sign_json, encode_canonical_json
verify_signed_json, signature_ids, sign_json, encode_canonical_json,
SignatureVerifyException,
)
from signedjson.key import (
is_signing_algorithm_supported, decode_verify_key_bytes
is_signing_algorithm_supported, decode_verify_key_bytes,
encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
deferred(twisted.internet.defer.Deferred):
deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
"""Waits for the key to become available, and then performs a verification
Args:
verify_request (VerifyKeyRequest):
Returns:
Deferred[None]
Raises:
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
except Exception:
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name, verify_key.alg, verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
"Invalid signature for server %s with key %s:%s: %s" % (
server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)

View file

@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
from synapse.metrics import LaterGauge
from blist import sorteddict
from sortedcontainers import SortedDict
from collections import namedtuple
import logging
@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id
self.presence_changed = SortedDict() # Stream position -> user_id
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
self.edus = sorteddict() # stream position -> Edu
self.edus = SortedDict() # stream position -> Edu
self.failures = sorteddict() # stream position -> (destination, Failure)
self.failures = SortedDict() # stream position -> (destination, Failure)
self.device_messages = sorteddict() # stream position -> destination
self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
self.pos_time = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@ -75,7 +75,7 @@ class FederationRemoteSendQueue(object):
# changes. ARGH.
def register(name, queue):
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
"", lambda: len(queue))
"", [], lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
now = self.clock.time_msec()
keys = self.pos_time.keys()
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
if not keys[:time]:
return
@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]
@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
i = self.keyed_edu_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]
@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
i = self.edus.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]
# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
i = self.failures.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
self._clear_queue_before_pos(federation_ack)
# Fetch changed presence
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for user_id in self.presence_changed[pos]
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]
for (key, user_id) in dest_user_ids:
@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
# We purposefully clobber based on the key here, python dict comprehensions
# always use the last value, so this will correctly point to the last
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed edus
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
edus = ((k, self.edus[k]) for k in keys[i:j])
i = self.edus.bisect_right(from_token)
j = self.edus.bisect_right(to_token) + 1
edus = self.edus.items()[i:j]
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
# Fetch changed failures
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
failures = ((k, self.failures[k]) for k in keys[i:j])
i = self.failures.bisect_right(from_token)
j = self.failures.bisect_right(to_token) + 1
failures = self.failures.items()[i:j]
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
)))
# Fetch changed device messages
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(

View file

@ -1794,6 +1794,10 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
)
defer.returnValue(missing_events)
@defer.inlineCallbacks

View file

@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from twisted.internet.defer import CancelledError
from twisted.python import failure
@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
value.trap(CancelledError)
raise RequestTimedOutError()
return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
uri
)

View file

@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@ -90,7 +90,8 @@ class SimpleHttpClient(object):
# counters to it
outgoing_requests_counter.labels(method).inc()
logger.info("Sending request %s %s", method, uri)
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = self.agent.request(
@ -105,14 +106,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, type(e).__name__, e.message
method, redact_uri(uri), type(e).__name__, e.message
)
raise e

View file

@ -14,18 +14,16 @@
import contextlib
import logging
import re
import time
from twisted.web.server import Site, Request
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
_next_request_seq = 0
@ -69,10 +67,7 @@ class SynapseRequest(Request):
return "%s-%i" % (self.method, self.request_seq)
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
self.uri
)
return redact_uri(self.uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]

View file

@ -60,10 +60,13 @@ class LaterGauge(object):
try:
calls = self.caller()
except Exception as e:
print(e)
logger.err()
except Exception:
logger.exception(
"Exception running callback for LaterGuage(%s)",
self.name,
)
yield g
return
if isinstance(calls, dict):
for k, v in calls.items():

View file

@ -50,14 +50,16 @@ REQUIREMENTS = {
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"blist": ["blist"],
"sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
"attr": ["attr"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],

View file

@ -622,7 +622,7 @@ tcp_inbound_commands = LaterGauge(
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter.counts)
for k, count in iteritems(p.inbound_commands_counter)
})
tcp_outbound_commands = LaterGauge(
@ -630,7 +630,7 @@ tcp_outbound_commands = LaterGauge(
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter.counts)
for k, count in iteritems(p.outbound_commands_counter)
})
# number of updates received for each RDATA stream

View file

@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
yield self.store.find_first_stream_ordering_after_ts(ts)
)
room_event_after_stream_ordering = (
r = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
if room_event_after_stream_ordering:
token = yield self.store.get_topological_token_for_event(
room_event_after_stream_ordering,
)
else:
if not r:
logger.warn(
"[purge] purging events not possible: No event found "
"(received_ts %i => stream_ordering %i)",
@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
"there is no event to be purged",
errcode=Codes.NOT_FOUND,
)
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
logger.info(
"[purge] purging up to token %d (received_ts %i => "
"[purge] purging up to token %s (received_ts %i => "
"stream_ordering %i)",
token, ts, stream_ordering,
)

View file

@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
txn.call_after(self.was_forgotten_at.invalidate_all)
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
txn, self.who_forgot_in_room, (room_id,)
@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)
@cachedInlineCallbacks(num_args=3)
def was_forgotten_at(self, user_id, room_id, event_id):
"""Returns whether user_id has elected to discard history for room_id at
event_id.
event_id must be a membership event."""
def f(txn):
sql = (
"SELECT"
" forgotten"
" FROM"
" room_memberships"
" WHERE"
" user_id = ?"
" AND"
" room_id = ?"
" AND"
" event_id = ?"
)
txn.execute(sql, (user_id, room_id, event_id))
rows = txn.fetchall()
return rows[0][0]
forgot = yield self.runInteraction("did_forget_membership_at", f)
defer.returnValue(forgot == 1)
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(

View file

@ -23,7 +23,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@ -305,7 +305,7 @@ class StateGroupWorkerStore(SQLBaseStore):
for typ in types:
if typ[1] is None:
where_clauses.append("(type = ?)")
where_args.extend(typ[0])
where_args.append(typ[0])
wildcard_types = True
else:
where_clauses.append("(type = ? AND state_key = ?)")

View file

@ -22,6 +22,16 @@ import six
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
caches_by_name = {}
collectors_by_name = {}

View file

@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree

View file

@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
from synapse.util import caches
from blist import sorteddict
from sortedcontainers import SortedDict
import logging
@ -32,16 +32,18 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
self._max_size = int(max_size * CACHE_SIZE_FACTOR)
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = sorteddict()
self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = register_cache("cache", self.name, self._cache)
self.metrics = caches.register_cache("cache", self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
@ -65,22 +67,25 @@ class StreamChangeCache(object):
return False
def get_entities_changed(self, entities, stream_pos):
"""Returns subset of entities that have had new things since the
given position. If the position is too old it will just return the given list.
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
not_known_entities = set(entities) - set(self._entity_to_key)
result = set(
self._cache[k] for k in keys[i:]
).intersection(entities)
result = (
set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
.intersection(entities)
.union(not_known_entities)
)
self.metrics.inc_hits()
else:
result = entities
result = set(entities)
self.metrics.inc_misses()
return result
@ -90,12 +95,13 @@ class StreamChangeCache(object):
"""
assert type(stream_pos) is int
if not self._cache:
# If we have no cache, nothing can have changed.
return False
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
return i < len(keys)
return self._cache.bisect_right(stream_pos) < len(self._cache)
else:
self.metrics.inc_misses()
return True
@ -107,10 +113,7 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
return [self._cache[k] for k in keys[i:]]
return self._cache.values()[self._cache.bisect_right(stream_pos) :]
else:
return None
@ -129,8 +132,10 @@ class StreamChangeCache(object):
self._entity_to_key[entity] = stream_pos
while len(self._cache) > self._max_size:
k, r = self._cache.popitem()
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos,
)
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):

View file

@ -12,23 +12,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import twisted
from twisted.trial import unittest
import logging
# logging doesn't have a "don't log anything at all EVARRRR setting,
# but since the highest value is 50, 1000000 should do ;)
NEVER = 1000000
import twisted
import twisted.logger
from twisted.trial import unittest
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(levelname)s:%(name)s:%(message)s [%(pathname)s:%(lineno)d]"
))
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(NEVER)
logging.getLogger("synapse.storage.SQL").setLevel(NEVER)
logging.getLogger("synapse.storage.txn").setLevel(NEVER)
from synapse.util.logcontext import LoggingContextFilter
# Set up putting Synapse's logs into Trial's.
rootLogger = logging.getLogger()
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s"
)
class ToTwistedHandler(logging.Handler):
tx_log = twisted.logger.Logger()
def emit(self, record):
log_entry = self.format(record)
log_level = record.levelname.lower().replace('warning', 'warn')
self.tx_log.emit(twisted.logger.LogLevel.levelWithName(log_level), log_entry)
handler = ToTwistedHandler()
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
handler.addFilter(LoggingContextFilter(request=""))
rootLogger.addHandler(handler)
def around(target):
@ -61,7 +75,7 @@ class TestCase(unittest.TestCase):
method = getattr(self, methodName)
level = getattr(method, "loglevel", getattr(self, "loglevel", NEVER))
level = getattr(method, "loglevel", getattr(self, "loglevel", logging.ERROR))
@around(self)
def setUp(orig):

View file

@ -0,0 +1,198 @@
from tests import unittest
from mock import patch
from synapse.util.caches.stream_change_cache import StreamChangeCache
class StreamChangeCacheTests(unittest.TestCase):
"""
Tests for StreamChangeCache.
"""
def test_prefilled_cache(self):
"""
Providing a prefilled cache to StreamChangeCache will result in a cache
with the prefilled-cache entered in.
"""
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
def test_has_entity_changed(self):
"""
StreamChangeCache.entity_has_changed will mark entities as changed, and
has_entity_changed will observe the changed entities.
"""
cache = StreamChangeCache("#test", 3)
cache.entity_has_changed("user@foo.com", 6)
cache.entity_has_changed("bar@baz.net", 7)
# If it's been changed after that stream position, return True
self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
# If it's been changed at that stream position, return False
self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
# If there's no changes after that stream position, return False
self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
# If the entity does not exist, return False.
self.assertFalse(cache.has_entity_changed("not@here.website", 7))
# If we request before the stream cache's earliest known position,
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
@patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0)
def test_has_entity_changed_pops_off_start(self):
"""
StreamChangeCache.entity_has_changed will respect the max size and
purge the oldest items upon reaching that max size.
"""
cache = StreamChangeCache("#test", 1, max_size=2)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)
# The cache is at the max size, 2
self.assertEqual(len(cache._cache), 2)
# The oldest item has been popped off
self.assertTrue("user@foo.com" not in cache._entity_to_key)
# If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5)
self.assertEqual(
set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key)
)
def test_get_all_entities_changed(self):
"""
StreamChangeCache.get_all_entities_changed will return all changed
entities since the given position. If the position is before the start
of the known stream, it returns None instead.
"""
cache = StreamChangeCache("#test", 1)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)
self.assertEqual(
cache.get_all_entities_changed(1),
["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
)
self.assertEqual(
cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"]
)
self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
self.assertEqual(cache.get_all_entities_changed(0), None)
def test_has_any_entity_changed(self):
"""
StreamChangeCache.has_any_entity_changed will return True if any
entities have been changed since the provided stream position, and
False if they have not. If the cache has entries and the provided
stream position is before it, it will return True, otherwise False if
the cache has no entries.
"""
cache = StreamChangeCache("#test", 1)
# With no entities, it returns False for the past, present, and future.
self.assertFalse(cache.has_any_entity_changed(0))
self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity
cache.entity_has_changed("user@foo.com", 2)
# With an entity, it returns True for the past, the stream start
# position, and False for the stream position the entity was changed
# on and ones after it.
self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))
self.assertFalse(cache.has_any_entity_changed(3))
def test_get_entities_changed(self):
"""
StreamChangeCache.get_entities_changed will return the entities in the
given list that have changed since the provided stream ID. If the
stream position is earlier than the earliest known position, it will
return all of the entities queried for.
"""
cache = StreamChangeCache("#test", 1)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)
# Query all the entries, but mid-way through the stream. We should only
# get the ones after that point.
self.assertEqual(
cache.get_entities_changed(
["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2
),
set(["bar@baz.net", "user@elsewhere.org"]),
)
# Query all the entries mid-way through the stream, but include one
# that doesn't exist in it. We should get back the one that doesn't
# exist, too.
self.assertEqual(
cache.get_entities_changed(
[
"user@foo.com",
"bar@baz.net",
"user@elsewhere.org",
"not@here.website",
],
stream_pos=2,
),
set(["bar@baz.net", "user@elsewhere.org", "not@here.website"]),
)
# Query all the entries, but before the first known point. We will get
# all the entries we queried for, including ones that don't exist.
self.assertEqual(
cache.get_entities_changed(
[
"user@foo.com",
"bar@baz.net",
"user@elsewhere.org",
"not@here.website",
],
stream_pos=0,
),
set(
[
"user@foo.com",
"bar@baz.net",
"user@elsewhere.org",
"not@here.website",
]
),
)
def test_max_pos(self):
"""
StreamChangeCache.get_max_pos_of_last_change will return the most
recent point where the entity could have changed. If the entity is not
known, the stream start is provided instead.
"""
cache = StreamChangeCache("#test", 1)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
cache.entity_has_changed("user@elsewhere.org", 4)
# Known entities will return the point where they were changed.
self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2)
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
# Unknown entities will return the stream start position.
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1)

30
tox.ini
View file

@ -52,33 +52,41 @@ commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
tests/appservice/test_scheduler.py \
tests/api/test_filtering.py \
tests/api/test_ratelimiting.py \
tests/appservice \
tests/crypto \
tests/events \
tests/handlers/test_appservice.py \
tests/handlers/test_auth.py \
tests/handlers/test_device.py \
tests/handlers/test_directory.py \
tests/handlers/test_e2e_keys.py \
tests/handlers/test_presence.py \
tests/handlers/test_profile.py \
tests/handlers/test_register.py \
tests/replication/slave/storage/test_account_data.py \
tests/replication/slave/storage/test_receipts.py \
tests/storage/test_appservice.py \
tests/storage/test_background_update.py \
tests/storage/test_base.py \
tests/storage/test__base.py \
tests/storage/test_client_ips.py \
tests/storage/test_devices.py \
tests/storage/test_end_to_end_keys.py \
tests/storage/test_event_push_actions.py \
tests/storage/test_keys.py \
tests/storage/test_presence.py \
tests/storage/test_profile.py \
tests/storage/test_registration.py \
tests/storage/test_room.py \
tests/storage/test_user_directory.py \
tests/test_distributor.py \
tests/test_dns.py \
tests/test_preview.py \
tests/test_test_utils.py \
tests/test_types.py \
tests/util/test_dict_cache.py \
tests/util/test_expiring_cache.py \
tests/util/test_file_consumer.py \
tests/util/test_limiter.py \
tests/util/test_linearizer.py \
tests/util/test_logcontext.py \
tests/util/test_logformatter.py \
tests/util/test_rwlock.py \
tests/util/test_snapshot_cache.py \
tests/util/test_wheel_timer.py} \
tests/util} \
{env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m}