From 75daede92f041500347a5f446229be5ca50c2b8e Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Tue, 22 Mar 2016 18:22:52 +0000 Subject: [PATCH 1/6] String intern --- synapse/storage/state.py | 12 +++++++++--- synapse/util/caches/__init__.py | 8 ++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index eab2c5a8c..1982b1c60 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches import intern_string from twisted.internet import defer @@ -155,7 +156,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) + event_ids = yield self._get_current_state_for_key( + room_id, intern_string(event_type), intern_string(state_key) + ) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -202,7 +205,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (row["type"], row["state_key"]) + key = (intern_string(row["type"]), intern_string(row["state_key"])) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -393,7 +396,10 @@ class StateStore(SQLBaseStore): # cache absence of the key, on the assumption that if we've # explicitly asked for some types then we will probably ask # for them again. - state_dict = {key: None for key in types} + state_dict = { + (intern_string(etype), intern_string(state_key)): None + for (etype, state_key) in types + } state_dict.update(results[group]) results[group] = state_dict else: diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 1a1490419..9d450fade 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import synapse.metrics +from lrucache import LruCache DEBUG_CACHES = False @@ -25,3 +26,10 @@ cache_counter = metrics.register_cache( lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, labels=["name"], ) + +_string_cache = LruCache(5000) +caches_by_name["string_cache"] = _string_cache + + +def intern_string(string): + return _string_cache.setdefault(string, string) From fe9794706ab817fcedc17f99693eb0823b339d93 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Wed, 23 Mar 2016 14:58:08 +0000 Subject: [PATCH 2/6] Intern type and state_key on events --- synapse/events/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index abed6b5e6..2ceac19ad 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze +from synapse.util.caches import intern_string # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -140,6 +141,12 @@ class FrozenEvent(EventBase): unsigned = dict(event_dict.pop("unsigned", {})) + # We intern these strings because they turn up a lot (especially when + # caching). + event_dict["type"] = intern_string(event_dict["type"]) + if "state_key" in event_dict: + event_dict["state_key"] = intern_string(event_dict["state_key"]) + if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) else: From f96526ffc23fdd99ab47abda67fb579a1ad764f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Wed, 23 Mar 2016 15:01:05 +0000 Subject: [PATCH 3/6] Intern sender, event_id and room_id in events --- synapse/events/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 2ceac19ad..63004eaf0 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -146,6 +146,11 @@ class FrozenEvent(EventBase): event_dict["type"] = intern_string(event_dict["type"]) if "state_key" in event_dict: event_dict["state_key"] = intern_string(event_dict["state_key"]) + if "sender" in event_dict: + event_dict["sender"] = intern_string(event_dict["sender"]) + + event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) + event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) From acdfef7b1443a8260c43e31e9944b74dfdf286dc Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Wed, 23 Mar 2016 16:13:05 +0000 Subject: [PATCH 4/6] Intern all the things --- synapse/events/__init__.py | 11 +---- synapse/federation/federation_client.py | 1 + synapse/federation/transport/server.py | 28 ++++++------ synapse/http/server.py | 10 +++-- synapse/storage/_base.py | 3 +- synapse/storage/receipts.py | 21 +++++---- synapse/storage/state.py | 10 ++--- synapse/util/caches/__init__.py | 58 ++++++++++++++++++++++++- 8 files changed, 97 insertions(+), 45 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 63004eaf0..23f8b612a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.util.frozenutils import freeze -from synapse.util.caches import intern_string +from synapse.util.caches import intern_dict # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents @@ -143,14 +143,7 @@ class FrozenEvent(EventBase): # We intern these strings because they turn up a lot (especially when # caching). - event_dict["type"] = intern_string(event_dict["type"]) - if "state_key" in event_dict: - event_dict["state_key"] = intern_string(event_dict["state_key"]) - if "sender" in event_dict: - event_dict["sender"] = intern_string(event_dict["sender"]) - - event_dict["event_id"] = intern(event_dict["event_id"].encode('ascii')) - event_dict["room_id"] = intern(event_dict["room_id"].encode('ascii')) + event_dict = intern_dict(event_dict) if USE_FROZEN_DICTS: frozen_dict = freeze(event_dict) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83c1f4658..37ee469fa 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -418,6 +418,7 @@ class FederationClient(FederationBase): "Failed to make_%s via %s: %s", membership, destination, e.message ) + raise raise RuntimeError("Failed to send to any server.") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 208bff8d4..d65a7893d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -175,7 +175,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/([^/]*)/" + PATH = "/send/(?P<transaction_id>[^/]*)/" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( @@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/([^/]*)/" + PATH = "/event/(?P<event_id>[^/]*)/" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/([^/]*)/" + PATH = "/state/(?P<context>[^/]*)/" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): @@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/([^/]*)/" + PATH = "/backfill/(?P<context>[^/]*)/" def on_GET(self, origin, content, query, context): versions = query["v"] @@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet): class FederationQueryServlet(BaseFederationServlet): - PATH = "/query/([^/]*)" + PATH = "/query/(?P<query_type>[^/]*)" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): @@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): - PATH = "/make_join/([^/]*)/([^/]*)" + PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet): class FederationMakeLeaveServlet(BaseFederationServlet): - PATH = "/make_leave/([^/]*)/([^/]*)" + PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): @@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet): class FederationSendLeaveServlet(BaseFederationServlet): - PATH = "/send_leave/([^/]*)/([^/]*)" + PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id, txid): @@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth/([^/]*)/([^/]*)" + PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): - PATH = "/send_join/([^/]*)/([^/]*)" + PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet): class FederationInviteServlet(BaseFederationServlet): - PATH = "/invite/([^/]*)/([^/]*)" + PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): @@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet): class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): - PATH = "/exchange_third_party_invite/([^/]*)" + PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)" @defer.inlineCallbacks def on_PUT(self, origin, content, query, room_id): @@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet): class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/([^/]*)/([^/]*)" + PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): @@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet): class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? - PATH = "/get_missing_events/([^/]*)/?" + PATH = "/get_missing_events/(?P<room_id>[^/]*)/?" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): diff --git a/synapse/http/server.py b/synapse/http/server.py index b17b190ee..b82196fd5 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes ) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import intern_dict import synapse.metrics import synapse.events @@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource): else: servlet_classname = "%r" % callback - args = [ - urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups() - ] + kwargs = intern_dict({ + name: urllib.unquote(value).decode("UTF-8") if value else value + for name, value in m.groupdict().items() + }) - callback_return = yield callback(request, *args) + callback_return = yield callback(request, **kwargs) if callback_return is not None: code, response = callback_return self._send_response(request, code, response) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 583b77a83..b75b79df3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache +from synapse.util.caches import intern_dict import synapse.metrics @@ -350,7 +351,7 @@ class SQLBaseStore(object): """ col_headers = list(column[0] for column in cursor.description) results = list( - dict(zip(col_headers, row)) for row in cursor.fetchall() + intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall() ) return results diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index dbc074d6b..6b9d848ea 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): - def f(txn): - sql = ( - "SELECT room_id,event_id " - "FROM receipts_linearized " - "WHERE user_id = ? AND receipt_type = ? " - ) - txn.execute(sql, (user_id, receipt_type)) - return txn.fetchall() + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={ + "user_id": user_id, + "receipt_type": receipt_type, + }, + retcols=("room_id", "event_id"), + desc="get_receipts_for_user", + ) - defer.returnValue(dict( - (yield self.runInteraction("get_receipts_for_user", f)) - )) + defer.returnValue({row["room_id"]: row["event_id"] for row in rows}) @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1982b1c60..03eecbbbb 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -156,9 +156,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state_for_key(self, room_id, event_type, state_key): - event_ids = yield self._get_current_state_for_key( - room_id, intern_string(event_type), intern_string(state_key) - ) + event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key) events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) @@ -205,7 +203,7 @@ class StateStore(SQLBaseStore): results = {} for row in rows: - key = (intern_string(row["type"]), intern_string(row["state_key"])) + key = (row["type"], row["state_key"]) results.setdefault(row["state_group"], {})[key] = row["event_id"] return results @@ -286,7 +284,9 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) + defer.returnValue({ + intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows + }) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 9d450fade..838cec45f 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -15,6 +15,9 @@ import synapse.metrics from lrucache import LruCache +import os + +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) DEBUG_CACHES = False @@ -27,9 +30,62 @@ cache_counter = metrics.register_cache( labels=["name"], ) -_string_cache = LruCache(5000) +_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache +KNOWN_KEYS = { + key: key for key in + ( + "auth_events", + "content", + "depth", + "event_id", + "hashes", + "origin", + "origin_server_ts", + "prev_events", + "room_id", + "sender", + "signatures", + "state_key", + "type", + "unsigned", + "user_id", + ) +} + + def intern_string(string): + """Takes a (potentially) unicode string and interns using custom cache + """ return _string_cache.setdefault(string, string) + + +def intern_dict(dictionary): + """Takes a dictionary and interns well known keys and their values + """ + return _intern_known_values({ + _intern_key(key): value for key, value in dictionary.items() + }) + + +def _intern_known_values(dictionary): + intern_str_keys = ("event_id", "room_id") + intern_unicode_keys = ("sender", "user_id", "type", "state_key") + + for key in intern_str_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern(val.encode('ascii')) + + for key in intern_unicode_keys: + val = dictionary.get(key, None) + if val is not None: + dictionary[key] = intern_string(val) + + return dictionary + + +def _intern_key(key): + return KNOWN_KEYS.get(key, key) From 2f0180b09e2a2afeed418a5840ae6b4fffcb4be4 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Wed, 23 Mar 2016 16:29:46 +0000 Subject: [PATCH 5/6] Don't bother interning keys that are already interned --- synapse/storage/state.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03eecbbbb..02cefdff2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -284,9 +284,7 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_events", ) - defer.returnValue({ - intern(row["event_id"].encode('ascii')): row["state_group"] for row in rows - }) + defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) def _get_some_state_from_cache(self, group, types): """Checks if group is in cache. See `_get_state_for_groups` From 8122ad7bab74b1a52188e350ca605033a9eca28e Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Wed, 23 Mar 2016 16:34:59 +0000 Subject: [PATCH 6/6] Simplify intern_dict --- synapse/util/caches/__init__.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 838cec45f..d53569ca4 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -65,27 +65,20 @@ def intern_string(string): def intern_dict(dictionary): """Takes a dictionary and interns well known keys and their values """ - return _intern_known_values({ - _intern_key(key): value for key, value in dictionary.items() - }) + return { + KNOWN_KEYS.get(key, key): _intern_known_values(key, value) + for key, value in dictionary.items() + } -def _intern_known_values(dictionary): +def _intern_known_values(key, value): intern_str_keys = ("event_id", "room_id") intern_unicode_keys = ("sender", "user_id", "type", "state_key") - for key in intern_str_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern(val.encode('ascii')) + if key in intern_str_keys: + return intern(value.encode('ascii')) - for key in intern_unicode_keys: - val = dictionary.get(key, None) - if val is not None: - dictionary[key] = intern_string(val) + if key in intern_unicode_keys: + return intern_string(value) - return dictionary - - -def _intern_key(key): - return KNOWN_KEYS.get(key, key) + return value