forked from MirrorHub/synapse
Rename table. Add docs.
This commit is contained in:
parent
784a2d4f2c
commit
d9664344ec
4 changed files with 16 additions and 6 deletions
|
@ -29,7 +29,7 @@ class BaseSlavedStore(SQLBaseStore):
|
||||||
super(BaseSlavedStore, self).__init__(hs)
|
super(BaseSlavedStore, self).__init__(hs)
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
self._cache_id_gen = SlavedIdTracker(
|
self._cache_id_gen = SlavedIdTracker(
|
||||||
db_conn, "cache_stream", "stream_id",
|
db_conn, "cache_invalidation_stream", "stream_id",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._cache_id_gen = None
|
self._cache_id_gen = None
|
||||||
|
|
|
@ -126,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
self._cache_id_gen = StreamIdGenerator(
|
self._cache_id_gen = StreamIdGenerator(
|
||||||
db_conn, "cache_stream", "stream_id",
|
db_conn, "cache_invalidation_stream", "stream_id",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._cache_id_gen = None
|
self._cache_id_gen = None
|
||||||
|
|
|
@ -863,6 +863,13 @@ class SQLBaseStore(object):
|
||||||
return cache, min_val
|
return cache, min_val
|
||||||
|
|
||||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||||
|
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||||
|
will know to invalidate their caches.
|
||||||
|
|
||||||
|
This should only be used to invalidate caches where slaves won't
|
||||||
|
otherwise know from other replication streams that the cache should
|
||||||
|
be invalidated.
|
||||||
|
"""
|
||||||
txn.call_after(cache_func.invalidate, keys)
|
txn.call_after(cache_func.invalidate, keys)
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
@ -872,7 +879,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="cache_stream",
|
table="cache_invalidation_stream",
|
||||||
values={
|
values={
|
||||||
"stream_id": stream_id,
|
"stream_id": stream_id,
|
||||||
"cache_func": cache_func.__name__,
|
"cache_func": cache_func.__name__,
|
||||||
|
@ -887,7 +894,8 @@ class SQLBaseStore(object):
|
||||||
# send across cache invalidations as quickly as possible. Cache
|
# send across cache invalidations as quickly as possible. Cache
|
||||||
# invalidations are idempotent, so duplicates are fine.
|
# invalidations are idempotent, so duplicates are fine.
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream"
|
"SELECT stream_id, cache_func, keys, invalidation_ts"
|
||||||
|
" FROM cache_invalidation_stream"
|
||||||
" WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
|
" WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (last_id, limit,))
|
txn.execute(sql, (last_id, limit,))
|
||||||
|
|
|
@ -20,15 +20,17 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# This stream is used to notify replication slaves that some caches have
|
||||||
|
# been invalidated that they cannot infer from the other streams.
|
||||||
CREATE_TABLE = """
|
CREATE_TABLE = """
|
||||||
CREATE TABLE cache_stream (
|
CREATE TABLE cache_invalidation_stream (
|
||||||
stream_id BIGINT,
|
stream_id BIGINT,
|
||||||
cache_func TEXT,
|
cache_func TEXT,
|
||||||
keys TEXT[],
|
keys TEXT[],
|
||||||
invalidation_ts BIGINT
|
invalidation_ts BIGINT
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX cache_stream_id ON cache_stream(stream_id);
|
CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue