forked from MirrorHub/synapse
Don't require config to create database
This commit is contained in:
parent
2e308a3a38
commit
8aab9d87fa
13 changed files with 69 additions and 86 deletions
|
@ -19,6 +19,7 @@ from twisted.enterprise import adbapi
|
|||
|
||||
from synapse.storage._base import LoggingTransaction, SQLBaseStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
|
||||
import argparse
|
||||
import curses
|
||||
|
@ -37,6 +38,7 @@ BOOLEAN_COLUMNS = {
|
|||
"rooms": ["is_public"],
|
||||
"event_edges": ["is_state"],
|
||||
"presence_list": ["accepted"],
|
||||
"presence_stream": ["currently_active"],
|
||||
}
|
||||
|
||||
|
||||
|
@ -292,7 +294,7 @@ class Porter(object):
|
|||
}
|
||||
)
|
||||
|
||||
database_engine.prepare_database(db_conn)
|
||||
prepare_database(db_conn, database_engine, config=None)
|
||||
|
||||
db_conn.commit()
|
||||
|
||||
|
@ -309,8 +311,8 @@ class Porter(object):
|
|||
**self.postgres_config["args"]
|
||||
)
|
||||
|
||||
sqlite_engine = create_engine(FakeConfig(sqlite_config))
|
||||
postgres_engine = create_engine(FakeConfig(postgres_config))
|
||||
sqlite_engine = create_engine(sqlite_config)
|
||||
postgres_engine = create_engine(postgres_config)
|
||||
|
||||
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
|
||||
self.postgres_store = Store(postgres_db_pool, postgres_engine)
|
||||
|
@ -792,8 +794,3 @@ if __name__ == "__main__":
|
|||
if end_error_exec_info:
|
||||
exc_type, exc_value, exc_traceback = end_error_exec_info
|
||||
traceback.print_exception(exc_type, exc_value, exc_traceback)
|
||||
|
||||
|
||||
class FakeConfig:
|
||||
def __init__(self, database_config):
|
||||
self.database_config = database_config
|
||||
|
|
|
@ -33,7 +33,7 @@ from synapse.python_dependencies import (
|
|||
from synapse.rest import ClientRestResource
|
||||
from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
|
||||
from synapse.storage import are_all_users_on_domain
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
||||
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
@ -245,7 +245,7 @@ class SynapseHomeServer(HomeServer):
|
|||
except IncorrectDatabaseSetup as e:
|
||||
quit_with_error(e.message)
|
||||
|
||||
def get_db_conn(self):
|
||||
def get_db_conn(self, run_new_connection=True):
|
||||
# Any param beginning with cp_ is a parameter for adbapi, and should
|
||||
# not be passed to the database engine.
|
||||
db_params = {
|
||||
|
@ -254,6 +254,7 @@ class SynapseHomeServer(HomeServer):
|
|||
}
|
||||
db_conn = self.database_engine.module.connect(**db_params)
|
||||
|
||||
if run_new_connection:
|
||||
self.database_engine.on_new_connection(db_conn)
|
||||
return db_conn
|
||||
|
||||
|
@ -386,7 +387,7 @@ def setup(config_options):
|
|||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
|
||||
database_engine = create_engine(config)
|
||||
database_engine = create_engine(config.database_config)
|
||||
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
|
||||
|
||||
hs = SynapseHomeServer(
|
||||
|
@ -402,8 +403,10 @@ def setup(config_options):
|
|||
logger.info("Preparing database: %s...", config.database_config['name'])
|
||||
|
||||
try:
|
||||
db_conn = hs.get_db_conn()
|
||||
database_engine.prepare_database(db_conn)
|
||||
db_conn = hs.get_db_conn(run_new_connection=False)
|
||||
prepare_database(db_conn, database_engine, config=config)
|
||||
database_engine.on_new_connection(db_conn)
|
||||
|
||||
hs.run_startup_checks(db_conn, database_engine)
|
||||
|
||||
db_conn.commit()
|
||||
|
|
|
@ -26,13 +26,13 @@ SUPPORTED_MODULE = {
|
|||
}
|
||||
|
||||
|
||||
def create_engine(config):
|
||||
name = config.database_config["name"]
|
||||
def create_engine(database_config):
|
||||
name = database_config["name"]
|
||||
engine_class = SUPPORTED_MODULE.get(name, None)
|
||||
|
||||
if engine_class:
|
||||
module = importlib.import_module(name)
|
||||
return engine_class(module, config=config)
|
||||
return engine_class(module)
|
||||
|
||||
raise RuntimeError(
|
||||
"Unsupported database engine '%s'" % (name,)
|
||||
|
|
|
@ -13,18 +13,15 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
|
||||
from ._base import IncorrectDatabaseSetup
|
||||
|
||||
|
||||
class PostgresEngine(object):
|
||||
single_threaded = False
|
||||
|
||||
def __init__(self, database_module, config):
|
||||
def __init__(self, database_module):
|
||||
self.module = database_module
|
||||
self.module.extensions.register_type(self.module.extensions.UNICODE)
|
||||
self.config = config
|
||||
|
||||
def check_database(self, txn):
|
||||
txn.execute("SHOW SERVER_ENCODING")
|
||||
|
@ -44,9 +41,6 @@ class PostgresEngine(object):
|
|||
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
|
||||
)
|
||||
|
||||
def prepare_database(self, db_conn):
|
||||
prepare_database(db_conn, self, config=self.config)
|
||||
|
||||
def is_deadlock(self, error):
|
||||
if isinstance(error, self.module.DatabaseError):
|
||||
return error.pgcode in ["40001", "40P01"]
|
||||
|
|
|
@ -13,9 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage.prepare_database import (
|
||||
prepare_database, prepare_sqlite3_database
|
||||
)
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
|
||||
import struct
|
||||
|
||||
|
@ -23,9 +21,8 @@ import struct
|
|||
class Sqlite3Engine(object):
|
||||
single_threaded = True
|
||||
|
||||
def __init__(self, database_module, config):
|
||||
def __init__(self, database_module):
|
||||
self.module = database_module
|
||||
self.config = config
|
||||
|
||||
def check_database(self, txn):
|
||||
pass
|
||||
|
@ -34,13 +31,9 @@ class Sqlite3Engine(object):
|
|||
return sql
|
||||
|
||||
def on_new_connection(self, db_conn):
|
||||
self.prepare_database(db_conn)
|
||||
prepare_database(db_conn, self, config=None)
|
||||
db_conn.create_function("rank", 1, _rank)
|
||||
|
||||
def prepare_database(self, db_conn):
|
||||
prepare_sqlite3_database(db_conn)
|
||||
prepare_database(db_conn, self, config=self.config)
|
||||
|
||||
def is_deadlock(self, error):
|
||||
return False
|
||||
|
||||
|
|
|
@ -53,6 +53,9 @@ class UpgradeDatabaseException(PrepareDatabaseException):
|
|||
def prepare_database(db_conn, database_engine, config):
|
||||
"""Prepares a database for usage. Will either create all necessary tables
|
||||
or upgrade from an older schema version.
|
||||
|
||||
If `config` is None then prepare_database will assert that no upgrade is
|
||||
necessary, *or* will create a fresh database if the database is empty.
|
||||
"""
|
||||
try:
|
||||
cur = db_conn.cursor()
|
||||
|
@ -60,13 +63,18 @@ def prepare_database(db_conn, database_engine, config):
|
|||
|
||||
if version_info:
|
||||
user_version, delta_files, upgraded = version_info
|
||||
|
||||
if config is None:
|
||||
if user_version != SCHEMA_VERSION:
|
||||
# If we don't pass in a config file then we are expecting to
|
||||
# have already upgraded the DB.
|
||||
raise UpgradeDatabaseException("Database needs to be upgraded")
|
||||
else:
|
||||
_upgrade_existing_database(
|
||||
cur, user_version, delta_files, upgraded, database_engine, config
|
||||
)
|
||||
else:
|
||||
_setup_new_database(cur, database_engine, config)
|
||||
|
||||
# cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
|
||||
_setup_new_database(cur, database_engine)
|
||||
|
||||
cur.close()
|
||||
db_conn.commit()
|
||||
|
@ -75,7 +83,7 @@ def prepare_database(db_conn, database_engine, config):
|
|||
raise
|
||||
|
||||
|
||||
def _setup_new_database(cur, database_engine, config):
|
||||
def _setup_new_database(cur, database_engine):
|
||||
"""Sets up the database by finding a base set of "full schemas" and then
|
||||
applying any necessary deltas.
|
||||
|
||||
|
@ -148,12 +156,13 @@ def _setup_new_database(cur, database_engine, config):
|
|||
applied_delta_files=[],
|
||||
upgraded=False,
|
||||
database_engine=database_engine,
|
||||
config=config,
|
||||
config=None,
|
||||
is_empty=True,
|
||||
)
|
||||
|
||||
|
||||
def _upgrade_existing_database(cur, current_version, applied_delta_files,
|
||||
upgraded, database_engine, config):
|
||||
upgraded, database_engine, config, is_empty=False):
|
||||
"""Upgrades an existing database.
|
||||
|
||||
Delta files can either be SQL stored in *.sql files, or python modules
|
||||
|
@ -246,6 +255,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
|
|||
module_name, absolute_path, python_file
|
||||
)
|
||||
logger.debug("Running script %s", relative_path)
|
||||
module.run_create(cur, database_engine)
|
||||
if not is_empty:
|
||||
module.run_upgrade(cur, database_engine, config=config)
|
||||
elif ext == ".pyc":
|
||||
# Sometimes .pyc files turn up anyway even though we've
|
||||
|
@ -361,36 +372,3 @@ def _get_or_create_schema_state(txn, database_engine):
|
|||
return current_version, applied_deltas, upgraded
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def prepare_sqlite3_database(db_conn):
|
||||
"""This function should be called before `prepare_database` on sqlite3
|
||||
databases.
|
||||
|
||||
Since we changed the way we store the current schema version and handle
|
||||
updates to schemas, we need a way to upgrade from the old method to the
|
||||
new. This only affects sqlite databases since they were the only ones
|
||||
supported at the time.
|
||||
"""
|
||||
with db_conn:
|
||||
schema_path = os.path.join(
|
||||
dir_path, "schema", "schema_version.sql",
|
||||
)
|
||||
create_schema = read_schema(schema_path)
|
||||
db_conn.executescript(create_schema)
|
||||
|
||||
c = db_conn.execute("SELECT * FROM schema_version")
|
||||
rows = c.fetchall()
|
||||
c.close()
|
||||
|
||||
if not rows:
|
||||
c = db_conn.execute("PRAGMA user_version")
|
||||
row = c.fetchone()
|
||||
c.close()
|
||||
|
||||
if row and row[0]:
|
||||
db_conn.execute(
|
||||
"REPLACE INTO schema_version (version, upgraded)"
|
||||
" VALUES (?,?)",
|
||||
(row[0], False)
|
||||
)
|
||||
|
|
|
@ -18,7 +18,7 @@ import logging
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_upgrade(cur, *args, **kwargs):
|
||||
def run_create(cur, *args, **kwargs):
|
||||
cur.execute("SELECT id, regex FROM application_services_regex")
|
||||
for row in cur.fetchall():
|
||||
try:
|
||||
|
@ -35,3 +35,7 @@ def run_upgrade(cur, *args, **kwargs):
|
|||
"UPDATE application_services_regex SET regex=? WHERE id=?",
|
||||
(new_regex, row[0])
|
||||
)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
|
|
@ -27,7 +27,7 @@ import logging
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
logger.info("Porting pushers table...")
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS pushers2 (
|
||||
|
@ -74,3 +74,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
|
|||
cur.execute("DROP TABLE pushers")
|
||||
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
|
||||
logger.info("Moved %d pushers to new table", count)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
|
|
@ -43,7 +43,7 @@ SQLITE_TABLE = (
|
|||
)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
for statement in get_statements(POSTGRES_TABLE.splitlines()):
|
||||
cur.execute(statement)
|
||||
|
@ -76,3 +76,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
|
|||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_search", progress_json))
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
|
|
@ -27,7 +27,7 @@ ALTER_TABLE = (
|
|||
)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
for statement in get_statements(ALTER_TABLE.splitlines()):
|
||||
cur.execute(statement)
|
||||
|
||||
|
@ -55,3 +55,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
|
|||
sql = database_engine.convert_param_style(sql)
|
||||
|
||||
cur.execute(sql, ("event_origin_server_ts", progress_json))
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
|
|
|
@ -18,7 +18,7 @@ from synapse.storage.appservice import ApplicationServiceStore
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, config, *args, **kwargs):
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
# NULL indicates user was not registered by an appservice.
|
||||
try:
|
||||
cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
|
||||
|
@ -26,6 +26,8 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
|
|||
# Maybe we already added the column? Hope so...
|
||||
pass
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, config, *args, **kwargs):
|
||||
cur.execute("SELECT name FROM users")
|
||||
rows = cur.fetchall()
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
|
|||
"test",
|
||||
db_pool=self.db_pool,
|
||||
config=config,
|
||||
database_engine=create_engine(config),
|
||||
database_engine=create_engine(config.database_config),
|
||||
)
|
||||
|
||||
self.datastore = SQLBaseStore(hs)
|
||||
|
|
|
@ -64,7 +64,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
|||
hs = HomeServer(
|
||||
name, db_pool=db_pool, config=config,
|
||||
version_string="Synapse/tests",
|
||||
database_engine=create_engine(config),
|
||||
database_engine=create_engine(config.database_config),
|
||||
get_db_conn=db_pool.get_db_conn,
|
||||
**kargs
|
||||
)
|
||||
|
@ -73,7 +73,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
|||
hs = HomeServer(
|
||||
name, db_pool=None, datastore=datastore, config=config,
|
||||
version_string="Synapse/tests",
|
||||
database_engine=create_engine(config),
|
||||
database_engine=create_engine(config.database_config),
|
||||
**kargs
|
||||
)
|
||||
|
||||
|
@ -298,7 +298,7 @@ class SQLiteMemoryDbPool(ConnectionPool, object):
|
|||
return conn
|
||||
|
||||
def create_engine(self):
|
||||
return create_engine(self.config)
|
||||
return create_engine(self.config.database_config)
|
||||
|
||||
|
||||
class MemoryDataStore(object):
|
||||
|
|
Loading…
Reference in a new issue