Include newly added sequences in the port DB script. (#9449)

And ensure the consistency of `event_auth_chain_id`.
This commit is contained in:
Patrick Cloke 2021-02-23 07:33:24 -05:00 committed by GitHub
parent 66f4949e7f
commit 65a9eb8994
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 26 deletions

1
changelog.d/9449.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`.

View file

@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
from typing import Dict, Optional, Set
from typing import Dict, Iterable, Optional, Set
import yaml
@ -629,7 +629,13 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
await self._setup_device_inbox_seq()
await self._setup_sequence(
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
)
await self._setup_sequence(
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
await self._setup_auth_chain_sequence()
# Step 3. Get tables.
self.progress.set_state("Fetching tables")
@ -854,7 +860,7 @@ class Porter(object):
return done, remaining + done
async def _setup_state_group_id_seq(self):
async def _setup_state_group_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
@ -868,7 +874,7 @@ class Porter(object):
await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
async def _setup_user_id_seq(self):
async def _setup_user_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)
@ -877,9 +883,9 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
async def _setup_events_stream_seqs(self):
async def _setup_events_stream_seqs(self) -> None:
"""Set the event stream sequences to the correct values.
"""
@ -908,35 +914,46 @@ class Porter(object):
(curr_backward_id + 1,),
)
return await self.postgres_store.db_pool.runInteraction(
await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)
async def _setup_device_inbox_seq(self):
"""Set the device inbox sequence to the correct value.
async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
"""Set a sequence to the correct value.
"""
curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_inbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids = []
for stream_id_table in stream_id_tables:
max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table=stream_id_table,
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
current_stream_ids.append(max_stream_id)
curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="device_federation_outbox",
keyvalues={},
retcol="COALESCE(MAX(stream_id), 1)",
allow_none=True,
)
next_id = max(current_stream_ids) + 1
next_id = max(curr_local_id, curr_federation_id) + 1
def r(txn):
sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
txn.execute(sql + " %s", (next_id, ))
await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)
async def _setup_auth_chain_sequence(self) -> None:
curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
)
def r(txn):
txn.execute(
"ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
"ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
(curr_chain_id,),
)
return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id", r,
)
##############################################

View file

@ -79,7 +79,7 @@ class Databases:
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
persist_events = PersistEventsStore(hs, database, main, db_conn)
if "state" in database_config.databases:
logger.info(

View file

@ -42,6 +42,7 @@ from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
@ -90,7 +91,11 @@ class PersistEventsStore:
"""
def __init__(
self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
self,
hs: "HomeServer",
db: DatabasePool,
main_data_store: "DataStore",
db_conn: Connection,
):
self.hs = hs
self.db_pool = db
@ -109,6 +114,12 @@ class PersistEventsStore:
) # type: MultiWriterIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator
# The consistency of this cannot be checked when the ID generator is
# created since the database might not yet be up-to-date.
self.db_pool.event_chain_id_gen.check_consistency(
db_conn, "event_auth_chains", "chain_id" # type: ignore
)
# This should only exist on instances that are configured to write
assert (
hs.get_instance_name() in hs.config.worker.writers.events