mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-12 04:52:26 +01:00
Fix bug in sliding sync when using old DB. (#17398)
We don't necessarily have `instance_name` for old events (before we support multiple event persisters). We treat those as if the `instance_name` was "master". --------- Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
parent
3fef535ff2
commit
8cdd2d214e
8 changed files with 33 additions and 212 deletions
1
changelog.d/17398.bugfix
Normal file
1
changelog.d/17398.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
|
|
@ -119,9 +119,6 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||||
self._attempt_to_invalidate_cache(
|
self._attempt_to_invalidate_cache(
|
||||||
"get_user_in_room_with_profile", (room_id, user_id)
|
"get_user_in_room_with_profile", (room_id, user_id)
|
||||||
)
|
)
|
||||||
self._attempt_to_invalidate_cache(
|
|
||||||
"get_rooms_for_user_with_stream_ordering", (user_id,)
|
|
||||||
)
|
|
||||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
|
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
|
||||||
|
|
||||||
# Purge other caches based on room state.
|
# Purge other caches based on room state.
|
||||||
|
@ -148,9 +145,6 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||||
self._attempt_to_invalidate_cache(
|
|
||||||
"get_rooms_for_user_with_stream_ordering", None
|
|
||||||
)
|
|
||||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||||
|
|
||||||
|
|
|
@ -268,16 +268,12 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
||||||
|
|
||||||
if data.type == EventTypes.Member:
|
if data.type == EventTypes.Member:
|
||||||
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
|
|
||||||
(data.state_key,)
|
|
||||||
)
|
|
||||||
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
|
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
|
||||||
elif row.type == EventsStreamAllStateRow.TypeId:
|
elif row.type == EventsStreamAllStateRow.TypeId:
|
||||||
assert isinstance(data, EventsStreamAllStateRow)
|
assert isinstance(data, EventsStreamAllStateRow)
|
||||||
# Similar to the above, but the entire caches are invalidated. This is
|
# Similar to the above, but the entire caches are invalidated. This is
|
||||||
# unfortunate for the membership caches, but should recover quickly.
|
# unfortunate for the membership caches, but should recover quickly.
|
||||||
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
|
||||||
self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
|
|
||||||
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
|
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
|
||||||
else:
|
else:
|
||||||
raise Exception("Unknown events stream row type %s" % (row.type,))
|
raise Exception("Unknown events stream row type %s" % (row.type,))
|
||||||
|
@ -334,9 +330,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
self._attempt_to_invalidate_cache(
|
self._attempt_to_invalidate_cache(
|
||||||
"get_invited_rooms_for_local_user", (state_key,)
|
"get_invited_rooms_for_local_user", (state_key,)
|
||||||
)
|
)
|
||||||
self._attempt_to_invalidate_cache(
|
|
||||||
"get_rooms_for_user_with_stream_ordering", (state_key,)
|
|
||||||
)
|
|
||||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
|
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
|
||||||
|
|
||||||
self._attempt_to_invalidate_cache(
|
self._attempt_to_invalidate_cache(
|
||||||
|
@ -399,9 +392,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
self._attempt_to_invalidate_cache("get_thread_id", None)
|
self._attempt_to_invalidate_cache("get_thread_id", None)
|
||||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
||||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||||
self._attempt_to_invalidate_cache(
|
|
||||||
"get_rooms_for_user_with_stream_ordering", None
|
|
||||||
)
|
|
||||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||||
self._attempt_to_invalidate_cache("did_forget", None)
|
self._attempt_to_invalidate_cache("did_forget", None)
|
||||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||||
|
|
|
@ -1457,7 +1457,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
event_dict[event_id] = _EventRow(
|
event_dict[event_id] = _EventRow(
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
stream_ordering=row[1],
|
stream_ordering=row[1],
|
||||||
instance_name=row[2],
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name=row[2] or "master",
|
||||||
internal_metadata=row[3],
|
internal_metadata=row[3],
|
||||||
json=row[4],
|
json=row[4],
|
||||||
format_version=row[5],
|
format_version=row[5],
|
||||||
|
|
|
@ -50,12 +50,7 @@ from synapse.storage.database import (
|
||||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.engines import Sqlite3Engine
|
from synapse.storage.engines import Sqlite3Engine
|
||||||
from synapse.storage.roommember import (
|
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
|
||||||
GetRoomsForUserWithStreamOrdering,
|
|
||||||
MemberSummary,
|
|
||||||
ProfileInfo,
|
|
||||||
RoomsForUser,
|
|
||||||
)
|
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
JsonDict,
|
JsonDict,
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
|
@ -494,7 +489,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
sender=sender,
|
sender=sender,
|
||||||
membership=membership,
|
membership=membership,
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
event_pos=PersistedEventPosition(instance_name, stream_ordering),
|
event_pos=PersistedEventPosition(
|
||||||
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name or "master",
|
||||||
|
stream_ordering,
|
||||||
|
),
|
||||||
room_version_id=room_version,
|
room_version_id=room_version,
|
||||||
)
|
)
|
||||||
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
|
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
|
||||||
|
@ -606,53 +605,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@cached(max_entries=500000, iterable=True)
|
|
||||||
async def get_rooms_for_user_with_stream_ordering(
|
|
||||||
self, user_id: str
|
|
||||||
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
|
|
||||||
"""Returns a set of room_ids the user is currently joined to.
|
|
||||||
|
|
||||||
If a remote user only returns rooms this server is currently
|
|
||||||
participating in.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Returns the rooms the user is in currently, along with the stream
|
|
||||||
ordering of the most recent join for that user and room, along with
|
|
||||||
the room version of the room.
|
|
||||||
"""
|
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"get_rooms_for_user_with_stream_ordering",
|
|
||||||
self._get_rooms_for_user_with_stream_ordering_txn,
|
|
||||||
user_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_rooms_for_user_with_stream_ordering_txn(
|
|
||||||
self, txn: LoggingTransaction, user_id: str
|
|
||||||
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
|
|
||||||
# We use `current_state_events` here and not `local_current_membership`
|
|
||||||
# as a) this gets called with remote users and b) this only gets called
|
|
||||||
# for rooms the server is participating in.
|
|
||||||
sql = """
|
|
||||||
SELECT room_id, e.instance_name, e.stream_ordering
|
|
||||||
FROM current_state_events AS c
|
|
||||||
INNER JOIN events AS e USING (room_id, event_id)
|
|
||||||
WHERE
|
|
||||||
c.type = 'm.room.member'
|
|
||||||
AND c.state_key = ?
|
|
||||||
AND c.membership = ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (user_id, Membership.JOIN))
|
|
||||||
return frozenset(
|
|
||||||
GetRoomsForUserWithStreamOrdering(
|
|
||||||
room_id, PersistedEventPosition(instance, stream_id)
|
|
||||||
)
|
|
||||||
for room_id, instance, stream_id in txn
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_users_server_still_shares_room_with(
|
async def get_users_server_still_shares_room_with(
|
||||||
self, user_ids: Collection[str]
|
self, user_ids: Collection[str]
|
||||||
) -> Set[str]:
|
) -> Set[str]:
|
||||||
|
@ -701,13 +653,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
If a remote user only returns rooms this server is currently
|
If a remote user only returns rooms this server is currently
|
||||||
participating in.
|
participating in.
|
||||||
"""
|
"""
|
||||||
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
|
|
||||||
(user_id,),
|
|
||||||
None,
|
|
||||||
update_metrics=False,
|
|
||||||
)
|
|
||||||
if rooms:
|
|
||||||
return frozenset(r.room_id for r in rooms)
|
|
||||||
|
|
||||||
room_ids = await self.db_pool.simple_select_onecol(
|
room_ids = await self.db_pool.simple_select_onecol(
|
||||||
table="current_state_events",
|
table="current_state_events",
|
||||||
|
|
|
@ -371,7 +371,7 @@ def _make_generic_sql_bound(
|
||||||
def _filter_results(
|
def _filter_results(
|
||||||
lower_token: Optional[RoomStreamToken],
|
lower_token: Optional[RoomStreamToken],
|
||||||
upper_token: Optional[RoomStreamToken],
|
upper_token: Optional[RoomStreamToken],
|
||||||
instance_name: str,
|
instance_name: Optional[str],
|
||||||
topological_ordering: int,
|
topological_ordering: int,
|
||||||
stream_ordering: int,
|
stream_ordering: int,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
@ -384,8 +384,14 @@ def _filter_results(
|
||||||
position maps, which we handle by fetching more than necessary from the DB
|
position maps, which we handle by fetching more than necessary from the DB
|
||||||
and then filtering (rather than attempting to construct a complicated SQL
|
and then filtering (rather than attempting to construct a complicated SQL
|
||||||
query).
|
query).
|
||||||
|
|
||||||
|
The `instance_name` arg is optional to handle historic rows, and is
|
||||||
|
interpreted as if it was "master".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if instance_name is None:
|
||||||
|
instance_name = "master"
|
||||||
|
|
||||||
event_historical_tuple = (
|
event_historical_tuple = (
|
||||||
topological_ordering,
|
topological_ordering,
|
||||||
stream_ordering,
|
stream_ordering,
|
||||||
|
@ -420,7 +426,7 @@ def _filter_results(
|
||||||
def _filter_results_by_stream(
|
def _filter_results_by_stream(
|
||||||
lower_token: Optional[RoomStreamToken],
|
lower_token: Optional[RoomStreamToken],
|
||||||
upper_token: Optional[RoomStreamToken],
|
upper_token: Optional[RoomStreamToken],
|
||||||
instance_name: str,
|
instance_name: Optional[str],
|
||||||
stream_ordering: int,
|
stream_ordering: int,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
|
@ -436,7 +442,14 @@ def _filter_results_by_stream(
|
||||||
position maps, which we handle by fetching more than necessary from the DB
|
position maps, which we handle by fetching more than necessary from the DB
|
||||||
and then filtering (rather than attempting to construct a complicated SQL
|
and then filtering (rather than attempting to construct a complicated SQL
|
||||||
query).
|
query).
|
||||||
|
|
||||||
|
The `instance_name` arg is optional to handle historic rows, and is
|
||||||
|
interpreted as if it was "master".
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if instance_name is None:
|
||||||
|
instance_name = "master"
|
||||||
|
|
||||||
if lower_token:
|
if lower_token:
|
||||||
assert lower_token.topological is None
|
assert lower_token.topological is None
|
||||||
|
|
||||||
|
@ -912,7 +925,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
prev_sender,
|
prev_sender,
|
||||||
) in txn:
|
) in txn:
|
||||||
assert room_id is not None
|
assert room_id is not None
|
||||||
assert instance_name is not None
|
|
||||||
assert stream_ordering is not None
|
assert stream_ordering is not None
|
||||||
|
|
||||||
if _filter_results_by_stream(
|
if _filter_results_by_stream(
|
||||||
|
@ -936,7 +948,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# Event
|
# Event
|
||||||
event_id=event_id,
|
event_id=event_id,
|
||||||
event_pos=PersistedEventPosition(
|
event_pos=PersistedEventPosition(
|
||||||
instance_name=instance_name,
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name=instance_name or "master",
|
||||||
stream=stream_ordering,
|
stream=stream_ordering,
|
||||||
),
|
),
|
||||||
# When `s.event_id = null`, we won't be able to get respective
|
# When `s.event_id = null`, we won't be able to get respective
|
||||||
|
@ -952,13 +965,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
prev_event_id=prev_event_id,
|
prev_event_id=prev_event_id,
|
||||||
prev_event_pos=(
|
prev_event_pos=(
|
||||||
PersistedEventPosition(
|
PersistedEventPosition(
|
||||||
instance_name=prev_instance_name,
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name=prev_instance_name or "master",
|
||||||
stream=prev_stream_ordering,
|
stream=prev_stream_ordering,
|
||||||
)
|
)
|
||||||
if (
|
if (prev_stream_ordering is not None)
|
||||||
prev_instance_name is not None
|
|
||||||
and prev_stream_ordering is not None
|
|
||||||
)
|
|
||||||
else None
|
else None
|
||||||
),
|
),
|
||||||
prev_membership=prev_membership,
|
prev_membership=prev_membership,
|
||||||
|
@ -1270,7 +1281,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
stream_ordering=stream_ordering,
|
stream_ordering=stream_ordering,
|
||||||
):
|
):
|
||||||
return event_id, PersistedEventPosition(
|
return event_id, PersistedEventPosition(
|
||||||
instance_name, stream_ordering
|
# If instance_name is null we default to "master"
|
||||||
|
instance_name or "master",
|
||||||
|
stream_ordering,
|
||||||
)
|
)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -210,7 +210,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Blow away caches (supported room versions can only change due to a restart).
|
# Blow away caches (supported room versions can only change due to a restart).
|
||||||
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
|
|
||||||
self.store.get_rooms_for_user.invalidate_all()
|
self.store.get_rooms_for_user.invalidate_all()
|
||||||
self.store._get_event_cache.clear()
|
self.store._get_event_cache.clear()
|
||||||
self.store._event_ref.clear()
|
self.store._event_ref.clear()
|
||||||
|
|
|
@ -30,19 +30,16 @@ from synapse.api.constants import ReceiptTypes
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase, make_event_from_dict
|
from synapse.events import EventBase, make_event_from_dict
|
||||||
from synapse.events.snapshot import EventContext
|
from synapse.events.snapshot import EventContext
|
||||||
from synapse.handlers.room import RoomEventSource
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.databases.main.event_push_actions import (
|
from synapse.storage.databases.main.event_push_actions import (
|
||||||
NotifCounts,
|
NotifCounts,
|
||||||
RoomNotifCounts,
|
RoomNotifCounts,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
|
from synapse.storage.roommember import RoomsForUser
|
||||||
from synapse.types import PersistedEventPosition
|
from synapse.types import PersistedEventPosition
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.server import FakeTransport
|
|
||||||
|
|
||||||
from ._base import BaseWorkerStoreTestCase
|
from ._base import BaseWorkerStoreTestCase
|
||||||
|
|
||||||
USER_ID = "@feeling:test"
|
USER_ID = "@feeling:test"
|
||||||
|
@ -221,125 +218,6 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_get_rooms_for_user_with_stream_ordering(self) -> None:
|
|
||||||
"""Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
|
|
||||||
by rows in the events stream
|
|
||||||
"""
|
|
||||||
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
||||||
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
||||||
self.replicate()
|
|
||||||
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
|
|
||||||
|
|
||||||
j2 = self.persist(
|
|
||||||
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
|
|
||||||
)
|
|
||||||
assert j2.internal_metadata.instance_name is not None
|
|
||||||
assert j2.internal_metadata.stream_ordering is not None
|
|
||||||
self.replicate()
|
|
||||||
|
|
||||||
expected_pos = PersistedEventPosition(
|
|
||||||
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
|
|
||||||
)
|
|
||||||
self.check(
|
|
||||||
"get_rooms_for_user_with_stream_ordering",
|
|
||||||
(USER_ID_2,),
|
|
||||||
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
"""Check that current_state invalidation happens correctly with multiple events
|
|
||||||
in the persistence batch.
|
|
||||||
|
|
||||||
This test attempts to reproduce a race condition between the event persistence
|
|
||||||
loop and a worker-based Sync handler.
|
|
||||||
|
|
||||||
The problem occurred when the master persisted several events in one batch. It
|
|
||||||
only updates the current_state at the end of each batch, so the obvious thing
|
|
||||||
to do is then to issue a current_state_delta stream update corresponding to the
|
|
||||||
last stream_id in the batch.
|
|
||||||
|
|
||||||
However, that raises the possibility that a worker will see the replication
|
|
||||||
notification for a join event before the current_state caches are invalidated.
|
|
||||||
|
|
||||||
The test involves:
|
|
||||||
* creating a join and a message event for a user, and persisting them in the
|
|
||||||
same batch
|
|
||||||
|
|
||||||
* controlling the replication stream so that updates are sent gradually
|
|
||||||
|
|
||||||
* between each bunch of replication updates, check that we see a consistent
|
|
||||||
snapshot of the state.
|
|
||||||
"""
|
|
||||||
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
||||||
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
||||||
self.replicate()
|
|
||||||
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
|
|
||||||
|
|
||||||
# limit the replication rate
|
|
||||||
repl_transport = self._server_transport
|
|
||||||
assert isinstance(repl_transport, FakeTransport)
|
|
||||||
repl_transport.autoflush = False
|
|
||||||
|
|
||||||
# build the join and message events and persist them in the same batch.
|
|
||||||
logger.info("----- build test events ------")
|
|
||||||
j2, j2ctx = self.build_event(
|
|
||||||
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
|
|
||||||
)
|
|
||||||
msg, msgctx = self.build_event()
|
|
||||||
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
|
|
||||||
self.replicate()
|
|
||||||
assert j2.internal_metadata.instance_name is not None
|
|
||||||
assert j2.internal_metadata.stream_ordering is not None
|
|
||||||
|
|
||||||
event_source = RoomEventSource(self.hs)
|
|
||||||
event_source.store = self.worker_store
|
|
||||||
current_token = event_source.get_current_key()
|
|
||||||
|
|
||||||
# gradually stream out the replication
|
|
||||||
while repl_transport.buffer:
|
|
||||||
logger.info("------ flush ------")
|
|
||||||
repl_transport.flush(30)
|
|
||||||
self.pump(0)
|
|
||||||
|
|
||||||
prev_token = current_token
|
|
||||||
current_token = event_source.get_current_key()
|
|
||||||
|
|
||||||
# attempt to replicate the behaviour of the sync handler.
|
|
||||||
#
|
|
||||||
# First, we get a list of the rooms we are joined to
|
|
||||||
joined_rooms = self.get_success(
|
|
||||||
self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Then, we get a list of the events since the last sync
|
|
||||||
membership_changes = self.get_success(
|
|
||||||
self.worker_store.get_membership_changes_for_user(
|
|
||||||
USER_ID_2, prev_token, current_token
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"%s->%s: joined_rooms=%r membership_changes=%r",
|
|
||||||
prev_token,
|
|
||||||
current_token,
|
|
||||||
joined_rooms,
|
|
||||||
membership_changes,
|
|
||||||
)
|
|
||||||
|
|
||||||
# the membership change is only any use to us if the room is in the
|
|
||||||
# joined_rooms list.
|
|
||||||
if membership_changes:
|
|
||||||
expected_pos = PersistedEventPosition(
|
|
||||||
j2.internal_metadata.instance_name,
|
|
||||||
j2.internal_metadata.stream_ordering,
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
|
||||||
joined_rooms,
|
|
||||||
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
|
|
||||||
)
|
|
||||||
|
|
||||||
event_id = 0
|
event_id = 0
|
||||||
|
|
||||||
def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
|
def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
|
||||||
|
|
Loading…
Reference in a new issue