mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 16:23:51 +01:00
Merge pull request #540 from matrix-org/erikj/sync
Prefill stream change caches
This commit is contained in:
commit
c1de91aca4
4 changed files with 56 additions and 19 deletions
|
@ -45,9 +45,10 @@ from .search import SearchStore
|
||||||
from .tags import TagsStore
|
from .tags import TagsStore
|
||||||
from .account_data import AccountDataStore
|
from .account_data import AccountDataStore
|
||||||
|
|
||||||
|
|
||||||
from util.id_generators import IdGenerator, StreamIdGenerator
|
from util.id_generators import IdGenerator, StreamIdGenerator
|
||||||
|
|
||||||
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -84,6 +85,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
self.database_engine = hs.database_engine
|
||||||
|
|
||||||
cur = db_conn.cursor()
|
cur = db_conn.cursor()
|
||||||
try:
|
try:
|
||||||
|
@ -117,8 +119,57 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
|
self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
|
||||||
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
|
self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
|
||||||
|
|
||||||
|
events_max = self._stream_id_gen.get_max_token(None)
|
||||||
|
event_cache_prefill, min_event_val = self._get_cache_dict(
|
||||||
|
db_conn, "events",
|
||||||
|
entity_column="room_id",
|
||||||
|
stream_column="stream_ordering",
|
||||||
|
max_value=events_max,
|
||||||
|
)
|
||||||
|
self._events_stream_cache = StreamChangeCache(
|
||||||
|
"EventsRoomStreamChangeCache", min_event_val,
|
||||||
|
prefilled_cache=event_cache_prefill,
|
||||||
|
)
|
||||||
|
|
||||||
|
account_max = self._account_data_id_gen.get_max_token(None)
|
||||||
|
self._account_data_stream_cache = StreamChangeCache(
|
||||||
|
"AccountDataAndTagsChangeCache", account_max,
|
||||||
|
)
|
||||||
|
|
||||||
super(DataStore, self).__init__(hs)
|
super(DataStore, self).__init__(hs)
|
||||||
|
|
||||||
|
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
|
||||||
|
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
|
||||||
|
# It doesn't really matter how many we get, the StreamChangeCache will
|
||||||
|
# do the right thing to ensure it respects the max size of cache.
|
||||||
|
sql = (
|
||||||
|
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
|
||||||
|
" WHERE %(stream)s > ? - 100000"
|
||||||
|
" GROUP BY %(entity)s"
|
||||||
|
) % {
|
||||||
|
"table": table,
|
||||||
|
"entity": entity_column,
|
||||||
|
"stream": stream_column,
|
||||||
|
}
|
||||||
|
|
||||||
|
sql = self.database_engine.convert_param_style(sql)
|
||||||
|
|
||||||
|
txn = db_conn.cursor()
|
||||||
|
txn.execute(sql, (int(max_value),))
|
||||||
|
rows = txn.fetchall()
|
||||||
|
|
||||||
|
cache = {
|
||||||
|
row[0]: int(row[1])
|
||||||
|
for row in rows
|
||||||
|
}
|
||||||
|
|
||||||
|
if cache:
|
||||||
|
min_val = min(cache.values())
|
||||||
|
else:
|
||||||
|
min_val = max_value
|
||||||
|
|
||||||
|
return cache, min_val
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def insert_client_ip(self, user, access_token, ip, user_agent):
|
def insert_client_ip(self, user, access_token, ip, user_agent):
|
||||||
now = int(self._clock.time_msec())
|
now = int(self._clock.time_msec())
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
@ -24,14 +23,6 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AccountDataStore(SQLBaseStore):
|
class AccountDataStore(SQLBaseStore):
|
||||||
def __init__(self, hs):
|
|
||||||
super(AccountDataStore, self).__init__(hs)
|
|
||||||
|
|
||||||
self._account_data_stream_cache = StreamChangeCache(
|
|
||||||
"AccountDataAndTagsChangeCache",
|
|
||||||
self._account_data_id_gen.get_max_token(None),
|
|
||||||
max_size=10000,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_account_data_for_user(self, user_id):
|
def get_account_data_for_user(self, user_id):
|
||||||
"""Get all the client account_data for a user.
|
"""Get all the client account_data for a user.
|
||||||
|
|
|
@ -37,7 +37,6 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
@ -78,13 +77,6 @@ def upper_bound(token):
|
||||||
|
|
||||||
|
|
||||||
class StreamStore(SQLBaseStore):
|
class StreamStore(SQLBaseStore):
|
||||||
def __init__(self, hs):
|
|
||||||
super(StreamStore, self).__init__(hs)
|
|
||||||
|
|
||||||
self._events_stream_cache = StreamChangeCache(
|
|
||||||
"EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
|
||||||
# NB this lives here instead of appservice.py so we can reuse the
|
# NB this lives here instead of appservice.py so we can reuse the
|
||||||
|
|
|
@ -32,7 +32,7 @@ class StreamChangeCache(object):
|
||||||
entities that may have changed since that position. If position key is too
|
entities that may have changed since that position. If position key is too
|
||||||
old then the cache will simply return all given entities.
|
old then the cache will simply return all given entities.
|
||||||
"""
|
"""
|
||||||
def __init__(self, name, current_stream_pos, max_size=10000):
|
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
|
||||||
self._max_size = max_size
|
self._max_size = max_size
|
||||||
self._entity_to_key = {}
|
self._entity_to_key = {}
|
||||||
self._cache = sorteddict()
|
self._cache = sorteddict()
|
||||||
|
@ -40,6 +40,9 @@ class StreamChangeCache(object):
|
||||||
self.name = name
|
self.name = name
|
||||||
caches_by_name[self.name] = self._cache
|
caches_by_name[self.name] = self._cache
|
||||||
|
|
||||||
|
for entity, stream_pos in prefilled_cache.items():
|
||||||
|
self.entity_has_changed(entity, stream_pos)
|
||||||
|
|
||||||
def has_entity_changed(self, entity, stream_pos):
|
def has_entity_changed(self, entity, stream_pos):
|
||||||
"""Returns True if the entity may have been updated since stream_pos
|
"""Returns True if the entity may have been updated since stream_pos
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in a new issue