0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-22 08:30:07 +01:00

Merge remote-tracking branch 'origin/release-v1.26.0' into develop

This commit is contained in:
Erik Johnston 2021-01-21 15:09:30 +00:00
commit b249f002b8
5 changed files with 62 additions and 7 deletions

1
changelog.d/9193.bugfix Normal file
View file

@ -0,0 +1 @@
Fix receipts or account data not being sent down sync. Introduced in v1.26.0rc1.

View file

@ -68,7 +68,7 @@ class AccountDataWorkerStore(SQLBaseStore):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for # updated over replication. (Multiple writers are not supported for
# SQLite). # SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events: if hs.get_instance_name() in hs.config.worker.writers.account_data:
self._account_data_id_gen = StreamIdGenerator( self._account_data_id_gen = StreamIdGenerator(
db_conn, db_conn,
"room_account_data", "room_account_data",

View file

@ -45,7 +45,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
self._receipts_id_gen = MultiWriterIdGenerator( self._receipts_id_gen = MultiWriterIdGenerator(
db_conn=db_conn, db_conn=db_conn,
db=database, db=database,
stream_name="account_data", stream_name="receipts",
instance_name=self._instance_name, instance_name=self._instance_name,
tables=[("receipts_linearized", "instance_name", "stream_id")], tables=[("receipts_linearized", "instance_name", "stream_id")],
sequence_name="receipts_sequence", sequence_name="receipts_sequence",
@ -61,7 +61,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for # updated over replication. (Multiple writers are not supported for
# SQLite). # SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events: if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator( self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id" db_conn, "receipts_linearized", "stream_id"
) )

View file

@ -261,7 +261,11 @@ class MultiWriterIdGenerator:
# We check that the table and sequence haven't diverged. # We check that the table and sequence haven't diverged.
for table, _, id_column in tables: for table, _, id_column in tables:
self._sequence_gen.check_consistency( self._sequence_gen.check_consistency(
db_conn, table=table, id_column=id_column, positive=positive db_conn,
table=table,
id_column=id_column,
stream_name=stream_name,
positive=positive,
) )
# This goes and fills out the above state from the database. # This goes and fills out the above state from the database.

View file

@ -45,6 +45,21 @@ and run the following SQL:
See docs/postgres.md for more information. See docs/postgres.md for more information.
""" """
_INCONSISTENT_STREAM_ERROR = """
Postgres sequence '%(seq)s' is inconsistent with associated stream position
of '%(stream_name)s' in the 'stream_positions' table.
This is likely a programming error and should be reported at
https://github.com/matrix-org/synapse.
A temporary workaround to fix this error is to shut down Synapse (including
any and all workers) and run the following SQL:
DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
This will need to be done every time the server is restarted.
"""
class SequenceGenerator(metaclass=abc.ABCMeta): class SequenceGenerator(metaclass=abc.ABCMeta):
"""A class which generates a unique sequence of integers""" """A class which generates a unique sequence of integers"""
@ -60,14 +75,20 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
db_conn: "LoggingDatabaseConnection", db_conn: "LoggingDatabaseConnection",
table: str, table: str,
id_column: str, id_column: str,
stream_name: Optional[str] = None,
positive: bool = True, positive: bool = True,
): ):
"""Should be called during start up to test that the current value of """Should be called during start up to test that the current value of
the sequence is greater than or equal to the maximum ID in the table. the sequence is greater than or equal to the maximum ID in the table.
This is to handle various cases where the sequence value can get out This is to handle various cases where the sequence value can get out of
of sync with the table, e.g. if Synapse gets rolled back to a previous sync with the table, e.g. if Synapse gets rolled back to a previous
version and the rolled forwards again. version and the rolled forwards again.
If a stream name is given then this will check that any value in the
`stream_positions` table is less than or equal to the current sequence
value. If it isn't then it's likely that streams have been crossed
somewhere (e.g. two ID generators have the same stream name).
""" """
... ...
@ -93,8 +114,12 @@ class PostgresSequenceGenerator(SequenceGenerator):
db_conn: "LoggingDatabaseConnection", db_conn: "LoggingDatabaseConnection",
table: str, table: str,
id_column: str, id_column: str,
stream_name: Optional[str] = None,
positive: bool = True, positive: bool = True,
): ):
"""See SequenceGenerator.check_consistency for docstring.
"""
txn = db_conn.cursor(txn_name="sequence.check_consistency") txn = db_conn.cursor(txn_name="sequence.check_consistency")
# First we get the current max ID from the table. # First we get the current max ID from the table.
@ -118,6 +143,18 @@ class PostgresSequenceGenerator(SequenceGenerator):
"SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
) )
last_value, is_called = txn.fetchone() last_value, is_called = txn.fetchone()
# If we have an associated stream check the stream_positions table.
max_in_stream_positions = None
if stream_name:
txn.execute(
"SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
(stream_name,),
)
row = txn.fetchone()
if row:
max_in_stream_positions = row[0]
txn.close() txn.close()
# If `is_called` is False then `last_value` is actually the value that # If `is_called` is False then `last_value` is actually the value that
@ -138,6 +175,14 @@ class PostgresSequenceGenerator(SequenceGenerator):
% {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
) )
# If we have values in the stream positions table then they have to be
# less than or equal to `last_value`
if max_in_stream_positions and max_in_stream_positions > last_value:
raise IncorrectDatabaseSetup(
_INCONSISTENT_STREAM_ERROR
% {"seq": self._sequence_name, "stream": stream_name}
)
GetFirstCallbackType = Callable[[Cursor], int] GetFirstCallbackType = Callable[[Cursor], int]
@ -175,7 +220,12 @@ class LocalSequenceGenerator(SequenceGenerator):
return self._current_max_id return self._current_max_id
def check_consistency( def check_consistency(
self, db_conn: Connection, table: str, id_column: str, positive: bool = True self,
db_conn: Connection,
table: str,
id_column: str,
stream_name: Optional[str] = None,
positive: bool = True,
): ):
# There is nothing to do for in memory sequences # There is nothing to do for in memory sequences
pass pass