forked from MirrorHub/synapse
Merge pull request #694 from matrix-org/markjh/caches
Move _get_cache_dict into the SQLBaseStore
This commit is contained in:
commit
7d11f825aa
2 changed files with 34 additions and 33 deletions
|
@ -177,39 +177,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
self.__presence_on_startup = None
|
self.__presence_on_startup = None
|
||||||
return active_on_startup
|
return active_on_startup
|
||||||
|
|
||||||
def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
|
|
||||||
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
|
|
||||||
# It doesn't really matter how many we get, the StreamChangeCache will
|
|
||||||
# do the right thing to ensure it respects the max size of cache.
|
|
||||||
sql = (
|
|
||||||
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
|
|
||||||
" WHERE %(stream)s > ? - 100000"
|
|
||||||
" GROUP BY %(entity)s"
|
|
||||||
) % {
|
|
||||||
"table": table,
|
|
||||||
"entity": entity_column,
|
|
||||||
"stream": stream_column,
|
|
||||||
}
|
|
||||||
|
|
||||||
sql = self.database_engine.convert_param_style(sql)
|
|
||||||
|
|
||||||
txn = db_conn.cursor()
|
|
||||||
txn.execute(sql, (int(max_value),))
|
|
||||||
rows = txn.fetchall()
|
|
||||||
txn.close()
|
|
||||||
|
|
||||||
cache = {
|
|
||||||
row[0]: int(row[1])
|
|
||||||
for row in rows
|
|
||||||
}
|
|
||||||
|
|
||||||
if cache:
|
|
||||||
min_val = min(cache.values())
|
|
||||||
else:
|
|
||||||
min_val = max_value
|
|
||||||
|
|
||||||
return cache, min_val
|
|
||||||
|
|
||||||
def _get_active_presence(self, db_conn):
|
def _get_active_presence(self, db_conn):
|
||||||
"""Fetch non-offline presence from the database so that we can register
|
"""Fetch non-offline presence from the database so that we can register
|
||||||
the appropriate time outs.
|
the appropriate time outs.
|
||||||
|
|
|
@ -816,6 +816,40 @@ class SQLBaseStore(object):
|
||||||
self._next_stream_id += 1
|
self._next_stream_id += 1
|
||||||
return i
|
return i
|
||||||
|
|
||||||
|
def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
|
||||||
|
max_value):
|
||||||
|
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
|
||||||
|
# It doesn't really matter how many we get, the StreamChangeCache will
|
||||||
|
# do the right thing to ensure it respects the max size of cache.
|
||||||
|
sql = (
|
||||||
|
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
|
||||||
|
" WHERE %(stream)s > ? - 100000"
|
||||||
|
" GROUP BY %(entity)s"
|
||||||
|
) % {
|
||||||
|
"table": table,
|
||||||
|
"entity": entity_column,
|
||||||
|
"stream": stream_column,
|
||||||
|
}
|
||||||
|
|
||||||
|
sql = self.database_engine.convert_param_style(sql)
|
||||||
|
|
||||||
|
txn = db_conn.cursor()
|
||||||
|
txn.execute(sql, (int(max_value),))
|
||||||
|
rows = txn.fetchall()
|
||||||
|
txn.close()
|
||||||
|
|
||||||
|
cache = {
|
||||||
|
row[0]: int(row[1])
|
||||||
|
for row in rows
|
||||||
|
}
|
||||||
|
|
||||||
|
if cache:
|
||||||
|
min_val = min(cache.values())
|
||||||
|
else:
|
||||||
|
min_val = max_value
|
||||||
|
|
||||||
|
return cache, min_val
|
||||||
|
|
||||||
|
|
||||||
class _RollbackButIsFineException(Exception):
|
class _RollbackButIsFineException(Exception):
|
||||||
""" This exception is used to rollback a transaction without implying
|
""" This exception is used to rollback a transaction without implying
|
||||||
|
|
Loading…
Reference in a new issue