From 18579534ea67f2d98c189e2ddeccc4bfecb491eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:37:59 +0000 Subject: [PATCH 1/6] Prefill stream change caches --- synapse/storage/__init__.py | 49 +++++++++++++++++++++- synapse/storage/account_data.py | 9 ---- synapse/storage/stream.py | 8 ---- synapse/util/caches/stream_change_cache.py | 5 ++- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index eb8884230..95ae97d50 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,9 +45,10 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore - from util.id_generators import IdGenerator, StreamIdGenerator +from synapse.util.caches.stream_change_cache import StreamChangeCache + import logging @@ -117,8 +118,54 @@ class DataStore(RoomMemberStore, RoomStore, self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + events_max = self._stream_id_gen.get_max_token(None) + event_cache_prefill = self._get_cache_dict( + db_conn, "events", + entity_column="room_id", + stream_column="stream_ordering", + max_value=events_max, + ) + self._events_stream_cache = StreamChangeCache( + "EventsRoomStreamChangeCache", events_max, + prefilled_cache=event_cache_prefill, + ) + + account_max = self._account_data_id_gen.get_max_token(None) + account_cache_prefill = self._get_cache_dict( + db_conn, "account_data", + entity_column="user_id", + stream_column="stream_id", + max_value=account_max, + ) + self._account_data_stream_cache = StreamChangeCache( + "AccountDataAndTagsChangeCache", account_max, + prefilled_cache=account_cache_prefill, + ) + super(DataStore, self).__init__(hs) + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): + sql = ( + "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" + " WHERE %(stream)s > max(? - 100000, 0)" + " GROUP BY %(entity)s" + " ORDER BY MAX(%(stream)s) DESC" + " LIMIT 10000" + ) % { + "table": table, + "entity": entity_column, + "stream": stream_column, + } + + txn = db_conn.cursor() + txn.execute(sql, (int(max_value),)) + rows = txn.fetchall() + + return { + row[0]: row[1] + for row in rows + } + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ed6587429..625d062eb 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.stream_change_cache import StreamChangeCache from twisted.internet import defer import ujson as json @@ -24,14 +23,6 @@ logger = logging.getLogger(__name__) class AccountDataStore(SQLBaseStore): - def __init__(self, hs): - super(AccountDataStore, self).__init__(hs) - - self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", - self._account_data_id_gen.get_max_token(None), - max_size=10000, - ) def get_account_data_for_user(self, user_id): """Get all the client account_data for a user. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6e81d46c6..e245d2f91 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -37,7 +37,6 @@ from twisted.internet import defer from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -78,13 +77,6 @@ def upper_bound(token): class StreamStore(SQLBaseStore): - def __init__(self, hs): - super(StreamStore, self).__init__(hs) - - self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None) - ) - @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c673b1bdf..891cb619f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -32,7 +32,7 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000): + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): self._max_size = max_size self._entity_to_key = {} self._cache = sorteddict() @@ -40,6 +40,9 @@ class StreamChangeCache(object): self.name = name caches_by_name[self.name] = self._cache + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) + def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ From f67d60496a8a9b2c95fcacb6d4c539a1d4b6a105 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:41:16 +0000 Subject: [PATCH 2/6] Convert param style --- synapse/storage/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 95ae97d50..2ed505cb1 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -85,6 +85,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self.database_engine = hs.database_engine cur = db_conn.cursor() try: @@ -157,6 +158,8 @@ class DataStore(RoomMemberStore, RoomStore, "stream": stream_column, } + sql = self.database_engine.convert_param_style(sql) + txn = db_conn.cursor() txn.execute(sql, (int(max_value),)) rows = txn.fetchall() From 45488e0ffae5100c3a82568642736aff203e1602 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:42:01 +0000 Subject: [PATCH 3/6] Max is not a function --- synapse/storage/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 2ed505cb1..4d374a8b0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -148,7 +148,7 @@ class DataStore(RoomMemberStore, RoomStore, def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): sql = ( "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" - " WHERE %(stream)s > max(? - 100000, 0)" + " WHERE %(stream)s > ? - 100000" " GROUP BY %(entity)s" " ORDER BY MAX(%(stream)s) DESC" " LIMIT 10000" From 3d60686c0ceeb88c4f6269110e92dc0c7bf5a3b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:49:11 +0000 Subject: [PATCH 4/6] Actually use cache --- synapse/storage/__init__.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4d374a8b0..957fff3c2 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -120,26 +120,26 @@ class DataStore(RoomMemberStore, RoomStore, self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) events_max = self._stream_id_gen.get_max_token(None) - event_cache_prefill = self._get_cache_dict( + event_cache_prefill, min_event_val = self._get_cache_dict( db_conn, "events", entity_column="room_id", stream_column="stream_ordering", max_value=events_max, ) self._events_stream_cache = StreamChangeCache( - "EventsRoomStreamChangeCache", events_max, + "EventsRoomStreamChangeCache", min_event_val, prefilled_cache=event_cache_prefill, ) account_max = self._account_data_id_gen.get_max_token(None) - account_cache_prefill = self._get_cache_dict( + account_cache_prefill, min_acc_val = self._get_cache_dict( db_conn, "account_data", entity_column="user_id", stream_column="stream_id", max_value=account_max, ) self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", account_max, + "AccountDataAndTagsChangeCache", min_acc_val, prefilled_cache=account_cache_prefill, ) @@ -151,7 +151,6 @@ class DataStore(RoomMemberStore, RoomStore, " WHERE %(stream)s > ? - 100000" " GROUP BY %(entity)s" " ORDER BY MAX(%(stream)s) DESC" - " LIMIT 10000" ) % { "table": table, "entity": entity_column, @@ -164,11 +163,18 @@ class DataStore(RoomMemberStore, RoomStore, txn.execute(sql, (int(max_value),)) rows = txn.fetchall() - return { - row[0]: row[1] + cache = { + row[0]: int(row[1]) for row in rows } + if cache: + min_val = min(cache.values()) + else: + min_val = max_value + + return cache, min_val + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent): now = int(self._clock.time_msec()) From b5dbced9389d072d4bd15002c7ddffba9e54340e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 14:53:59 +0000 Subject: [PATCH 5/6] Don't prefill account data --- synapse/storage/__init__.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 957fff3c2..a6cb58856 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -132,15 +132,8 @@ class DataStore(RoomMemberStore, RoomStore, ) account_max = self._account_data_id_gen.get_max_token(None) - account_cache_prefill, min_acc_val = self._get_cache_dict( - db_conn, "account_data", - entity_column="user_id", - stream_column="stream_id", - max_value=account_max, - ) self._account_data_stream_cache = StreamChangeCache( - "AccountDataAndTagsChangeCache", min_acc_val, - prefilled_cache=account_cache_prefill, + "AccountDataAndTagsChangeCache", account_max, ) super(DataStore, self).__init__(hs) From 8da95b6f1bb1a37597f0b89c4da88b064401b0b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jan 2016 15:39:17 +0000 Subject: [PATCH 6/6] Comment. Remove superfluous order by --- synapse/storage/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a6cb58856..ee2153737 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -139,11 +139,13 @@ class DataStore(RoomMemberStore, RoomStore, super(DataStore, self).__init__(hs) def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): + # Fetch a mapping of room_id -> max stream position for "recent" rooms. + # It doesn't really matter how many we get, the StreamChangeCache will + # do the right thing to ensure it respects the max size of cache. sql = ( "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s" " WHERE %(stream)s > ? - 100000" " GROUP BY %(entity)s" - " ORDER BY MAX(%(stream)s) DESC" ) % { "table": table, "entity": entity_column,