diff --git a/changelog.d/15192.misc b/changelog.d/15192.misc new file mode 100644 index 000000000..107668687 --- /dev/null +++ b/changelog.d/15192.misc @@ -0,0 +1 @@ +Combine `AbstractStreamIdTracker` and `AbstractStreamIdGenerator`. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 0dd15f16f..5503621ad 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -52,7 +52,6 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, StreamIdGenerator, ) from synapse.types import JsonDict, StrCollection, get_verify_key_from_cross_signing_key @@ -91,7 +90,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._device_list_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "device_lists_stream", @@ -712,9 +711,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): The new stream ID. """ - # TODO: this looks like it's _writing_. Should this be on DeviceStore rather - # than DeviceWorkerStore? - async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next() as stream_id: await self.db_pool.runInteraction( "add_user_sig_change_to_streams", self._add_user_signature_change_txn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b7e749812..20b7a6836 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -72,7 +72,6 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, MultiWriterIdGenerator, StreamIdGenerator, ) @@ -187,8 +186,8 @@ class EventsWorkerStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) - self._stream_id_gen: AbstractStreamIdTracker - self._backfill_id_gen: AbstractStreamIdTracker + self._stream_id_gen: AbstractStreamIdGenerator + self._backfill_id_gen: AbstractStreamIdGenerator if isinstance(database.engine, PostgresEngine): # If we're using Postgres than we can use `MultiWriterIdGenerator` # regardless of whether this process writes to the streams or not. diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 9b2bbe060..9f862f00c 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -46,7 +46,6 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, IdGenerator, StreamIdGenerator, ) @@ -118,7 +117,7 @@ class PushRulesWorkerStore( # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._push_rules_stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "push_rules_stream", diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index fddbc07af..9a24f7a65 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -36,7 +36,6 @@ from synapse.storage.database import ( ) from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, - AbstractStreamIdTracker, StreamIdGenerator, ) from synapse.types import JsonDict @@ -60,7 +59,7 @@ class PusherWorkerStore(SQLBaseStore): # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + self._pushers_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), "pushers", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 92a82240a..074942b16 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -39,7 +39,7 @@ from synapse.storage.database import ( from synapse.storage.engines import PostgresEngine from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import ( - AbstractStreamIdTracker, + AbstractStreamIdGenerator, MultiWriterIdGenerator, StreamIdGenerator, ) @@ -65,7 +65,7 @@ class ReceiptsWorkerStore(SQLBaseStore): # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._receipts_id_gen: AbstractStreamIdTracker + self._receipts_id_gen: AbstractStreamIdGenerator if isinstance(database.engine, PostgresEngine): self._can_write_to_receipts = ( @@ -768,7 +768,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "insert_receipt_conv", self._graph_to_linear, room_id, event_ids ) - async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined] + async with self._receipts_id_gen.get_next() as stream_id: event_ts = await self.db_pool.runInteraction( "insert_linearized_receipt", self._insert_linearized_receipt_txn, diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 334d3d718..d2c874b9a 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -93,8 +93,11 @@ def _load_current_id( return res -class AbstractStreamIdTracker(metaclass=abc.ABCMeta): - """Tracks the "current" stream ID of a stream that may have multiple writers. +class AbstractStreamIdGenerator(metaclass=abc.ABCMeta): + """Generates or tracks stream IDs for a stream that may have multiple writers. + + Each stream ID represents a write transaction, whose completion is tracked + so that the "current" stream ID of the stream can be determined. Stream IDs are monotonically increasing or decreasing integers representing write transactions. The "current" stream ID is the stream ID such that all transactions @@ -130,16 +133,6 @@ class AbstractStreamIdTracker(metaclass=abc.ABCMeta): """ raise NotImplementedError() - -class AbstractStreamIdGenerator(AbstractStreamIdTracker): - """Generates stream IDs for a stream that may have multiple writers. - - Each stream ID represents a write transaction, whose completion is tracked - so that the "current" stream ID of the stream can be determined. - - See `AbstractStreamIdTracker` for more details. - """ - @abc.abstractmethod def get_next(self) -> AsyncContextManager[int]: """