0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-19 02:32:06 +01:00

Make synapse_port_db correctly create indexes (#6102)

Make `synapse_port_db` correctly create indexes in the PostgreSQL database, by having it run the background updates on the database before migrating the data.

To ensure we're migrating the right data, also block the port if the SQLite3 database still has pending or ongoing background updates.

Fixes #4877
This commit is contained in:
Brendan Abolivier 2019-10-23 15:31:59 +01:00 committed by GitHub
parent 409c62b27b
commit c97ed64db3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 54 deletions

1
changelog.d/6102.bugfix Normal file
View file

@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.

View file

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd # Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd # Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -29,9 +30,23 @@ import yaml
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
from synapse.storage.devices import DeviceBackgroundUpdateStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
from synapse.storage.prepare_database import prepare_database from synapse.storage.prepare_database import prepare_database
from synapse.storage.registration import RegistrationBackgroundUpdateStore
from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.search import SearchBackgroundUpdateStore
from synapse.storage.state import StateBackgroundUpdateStore
from synapse.storage.stats import StatsStore
from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
from synapse.util import Clock
logger = logging.getLogger("synapse_port_db") logger = logging.getLogger("synapse_port_db")
@ -98,33 +113,24 @@ APPEND_ONLY_TABLES = [
end_error_exec_info = None end_error_exec_info = None
class Store(object): class Store(
"""This object is used to pull out some of the convenience API from the ClientIpBackgroundUpdateStore,
Storage layer. DeviceInboxBackgroundUpdateStore,
DeviceBackgroundUpdateStore,
*All* database interactions should go through this object. EventsBackgroundUpdatesStore,
""" MediaRepositoryBackgroundUpdateStore,
RegistrationBackgroundUpdateStore,
def __init__(self, db_pool, engine): RoomMemberBackgroundUpdateStore,
self.db_pool = db_pool SearchBackgroundUpdateStore,
self.database_engine = engine StateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] StatsStore,
_simple_insert = SQLBaseStore.__dict__["_simple_insert"] ):
def __init__(self, db_conn, hs):
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] super().__init__(db_conn, hs)
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"] self.db_pool = hs.get_db_pool()
_simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
_simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__[
"_simple_select_one_onecol_txn"
]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs): def runInteraction(self, desc, func, *args, **kwargs):
def r(conn): def r(conn):
try: try:
@ -150,7 +156,8 @@ class Store(object):
logger.debug("[TXN FAIL] {%s} %s", desc, e) logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise raise
return self.db_pool.runWithConnection(r) with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))
def execute(self, f, *args, **kwargs): def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs) return self.runInteraction(f.__name__, f, *args, **kwargs)
@ -176,6 +183,25 @@ class Store(object):
raise raise
class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
def get_db_conn(self):
return self.db_conn
def get_db_pool(self):
return self.db_pool
def get_clock(self):
return self.clock
class Porter(object): class Porter(object):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
@ -447,31 +473,75 @@ class Porter(object):
db_conn.commit() db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, config):
"""Builds and returns a database store using the provided configuration.
Args:
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
Returns:
The built Store object.
"""
engine = create_engine(config)
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
store = Store(conn, hs)
yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
)
return store
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()
if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
)
@defer.inlineCallbacks @defer.inlineCallbacks
def run(self): def run(self):
try: try:
sqlite_db_pool = adbapi.ConnectionPool( self.sqlite_store = yield self.build_db_store(self.sqlite_config)
self.sqlite_config["name"], **self.sqlite_config["args"]
# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)
self.postgres_store = yield self.build_db_store(
self.hs_config.database_config
) )
postgres_db_pool = adbapi.ConnectionPool( yield self.run_background_updates_on_postgres()
self.postgres_config["name"], **self.postgres_config["args"]
)
sqlite_engine = create_engine(sqlite_config)
postgres_engine = create_engine(postgres_config)
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
yield self.postgres_store.execute(postgres_engine.check_database)
# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
self.setup_db(sqlite_config, sqlite_engine)
self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
self.progress.set_state("Creating port tables") self.progress.set_state("Creating port tables")
@ -563,6 +633,8 @@ class Porter(object):
def conv(j, col): def conv(j, col):
if j in bool_cols: if j in bool_cols:
return bool(col) return bool(col)
if isinstance(col, bytes):
return bytearray(col)
elif isinstance(col, string_types) and "\0" in col: elif isinstance(col, string_types) and "\0" in col:
logger.warn( logger.warn(
"DROPPING ROW: NUL value in table %s col %s: %r", "DROPPING ROW: NUL value in table %s col %s: %r",
@ -926,18 +998,24 @@ if __name__ == "__main__":
}, },
} }
postgres_config = yaml.safe_load(args.postgres_config) hs_config = yaml.safe_load(args.postgres_config)
if "database" in postgres_config: if "database" not in hs_config:
postgres_config = postgres_config["database"] sys.stderr.write("The configuration file must have a 'database' section.\n")
sys.exit(4)
postgres_config = hs_config["database"]
if "name" not in postgres_config: if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'") sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2) sys.exit(2)
if postgres_config["name"] != "psycopg2": if postgres_config["name"] != "psycopg2":
sys.stderr.write("Database must use 'psycopg2' connector.") sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3) sys.exit(3)
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
def start(stdscr=None): def start(stdscr=None):
if stdscr: if stdscr:
progress = CursesProgress(stdscr) progress = CursesProgress(stdscr)
@ -946,9 +1024,9 @@ if __name__ == "__main__":
porter = Porter( porter = Porter(
sqlite_config=sqlite_config, sqlite_config=sqlite_config,
postgres_config=postgres_config,
progress=progress, progress=progress,
batch_size=args.batch_size, batch_size=args.batch_size,
hs_config=config,
) )
reactor.callWhenRunning(porter.run) reactor.callWhenRunning(porter.run)