0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-07 13:53:53 +01:00

Faster remote room joins: invalidate caches and unblock requests when receiving un-partial-stated event notifications over replication. [rei:frrj/streams/unpsr] (#14546)

This commit is contained in:
reivilibre 2022-12-19 14:57:51 +00:00 committed by GitHub
parent adbf0cffc4
commit 2888d7ec83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 14 deletions

1
changelog.d/14546.misc Normal file
View file

@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of events over replication.

View file

@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream, TagAccountDataStream,
ToDeviceStream, ToDeviceStream,
TypingStream, TypingStream,
UnPartialStatedEventStream,
UnPartialStatedRoomStream, UnPartialStatedRoomStream,
) )
from synapse.replication.tcp.streams.events import ( from synapse.replication.tcp.streams.events import (
@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow, EventsStreamEventRow,
EventsStreamRow, EventsStreamRow,
) )
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow from synapse.replication.tcp.streams.partial_state import (
UnPartialStatedEventStreamRow,
UnPartialStatedRoomStreamRow,
)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -247,6 +251,14 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated( self._state_storage_controller.notify_room_un_partial_stated(
row.room_id row.room_id
) )
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
# Wake up any tasks waiting for the event to be un-partial-stated.
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)
await self._presence_handler.process_replication_rows( await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows stream_name, instance_name, token, rows

View file

@ -59,8 +59,9 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process, run_as_background_process,
wrap_as_background_process, wrap_as_background_process,
) )
from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import ( from synapse.storage.database import (
DatabasePool, DatabasePool,
@ -391,6 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
self._stream_id_gen.advance(instance_name, token) self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME: elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token) self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self.is_partial_state_event.invalidate((row.event_id,))
if row.rejection_status_changed:
# If the partial-stated event became rejected or unrejected
# when it wasn't before, we need to invalidate this cache.
self._invalidate_local_get_event_cache(row.event_id)
super().process_replication_rows(stream_name, instance_name, token, rows) super().process_replication_rows(stream_name, instance_name, token, rows)
@ -2380,6 +2391,9 @@ class EventsWorkerStore(SQLBaseStore):
This can happen, for example, when resyncing state during a faster join. This can happen, for example, when resyncing state during a faster join.
It is the caller's responsibility to ensure that other workers are
sent a notification so that they call `_invalidate_local_get_event_cache()`.
Args: Args:
txn: txn:
event_id: ID of event to update event_id: ID of event to update
@ -2418,14 +2432,3 @@ class EventsWorkerStore(SQLBaseStore):
) )
self.invalidate_get_event_cache_after_txn(txn, event_id) self.invalidate_get_event_cache_after_txn(txn, event_id)
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
# call '_send_invalidation_to_replication', but we actually need the other
# end to call _invalidate_local_get_event_cache() rather than (just)
# _get_event_cache.invalidate().
#
# One solution might be to (somehow) get the workers to call
# _invalidate_caches_for_event() (though that will invalidate more than
# strictly necessary).
#
# https://github.com/matrix-org/synapse/issues/12994

View file

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
import collections.abc import collections.abc
import logging import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple
import attr import attr
@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.replication.tcp.streams import UnPartialStatedEventStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import ( from synapse.storage.database import (
DatabasePool, DatabasePool,
@ -82,6 +84,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name() self._instance_name: str = hs.get_instance_name()
def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self._get_state_group_for_event.invalidate((row.event_id,))
super().process_replication_rows(stream_name, instance_name, token, rows)
async def get_room_version(self, room_id: str) -> RoomVersion: async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room """Get the room_version of a given room
Raises: Raises: