mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 04:53:53 +01:00
Fix get_last_event_in_room_before_stream_ordering(...)
finding the wrong last event (#17295)
PR where this was introduced: https://github.com/matrix-org/synapse/pull/14817 ### What does this affect? `get_last_event_in_room_before_stream_ordering(...)` is used in Sync v2 in a lot of different state calculations. `get_last_event_in_room_before_stream_ordering(...)` is also used in `/rooms/{roomId}/members`
This commit is contained in:
parent
c6eb99c878
commit
ebdce69f6a
3 changed files with 289 additions and 11 deletions
1
changelog.d/17295.bugfix
Normal file
1
changelog.d/17295.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.
|
|
@ -914,12 +914,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
def get_last_event_in_room_before_stream_ordering_txn(
|
def get_last_event_in_room_before_stream_ordering_txn(
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
) -> Optional[str]:
|
) -> Optional[str]:
|
||||||
# We need to handle the fact that the stream tokens can be vector
|
# We're looking for the closest event at or before the token. We need to
|
||||||
# clocks. We do this by getting all rows between the minimum and
|
# handle the fact that the stream token can be a vector clock (with an
|
||||||
# maximum stream ordering in the token, plus one row less than the
|
# `instance_map`) and events can be persisted on different instances
|
||||||
# minimum stream ordering. We then filter the results against the
|
# (sharded event persisters). The first subquery handles the events that
|
||||||
# token and return the first row that matches.
|
# would be within the vector clock and gets all rows between the minimum and
|
||||||
|
# maximum stream ordering in the token which need to be filtered against the
|
||||||
|
# `instance_map`. The second subquery handles the "before" case and finds
|
||||||
|
# the first row before the token. We then filter out any results past the
|
||||||
|
# token's vector clock and return the first row that matches.
|
||||||
|
min_stream = end_token.stream
|
||||||
|
max_stream = end_token.get_max_stream_pos()
|
||||||
|
|
||||||
|
# We use `union all` because we don't need any of the deduplication logic
|
||||||
|
# (`union` is really a union + distinct). `UNION ALL` does preserve the
|
||||||
|
# ordering of the operand queries but there is no actual gurantee that it
|
||||||
|
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
|
||||||
|
# bottom.
|
||||||
sql = """
|
sql = """
|
||||||
SELECT * FROM (
|
SELECT * FROM (
|
||||||
SELECT instance_name, stream_ordering, topological_ordering, event_id
|
SELECT instance_name, stream_ordering, topological_ordering, event_id
|
||||||
|
@ -931,7 +942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
AND rejections.event_id IS NULL
|
AND rejections.event_id IS NULL
|
||||||
ORDER BY stream_ordering DESC
|
ORDER BY stream_ordering DESC
|
||||||
) AS a
|
) AS a
|
||||||
UNION
|
UNION ALL
|
||||||
SELECT * FROM (
|
SELECT * FROM (
|
||||||
SELECT instance_name, stream_ordering, topological_ordering, event_id
|
SELECT instance_name, stream_ordering, topological_ordering, event_id
|
||||||
FROM events
|
FROM events
|
||||||
|
@ -943,15 +954,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
ORDER BY stream_ordering DESC
|
ORDER BY stream_ordering DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
) AS b
|
) AS b
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
"""
|
"""
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql,
|
sql,
|
||||||
(
|
(
|
||||||
room_id,
|
room_id,
|
||||||
end_token.stream,
|
min_stream,
|
||||||
end_token.get_max_stream_pos(),
|
max_stream,
|
||||||
room_id,
|
room_id,
|
||||||
end_token.stream,
|
min_stream,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,10 @@
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
from typing import List
|
import logging
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
from immutabledict import immutabledict
|
||||||
|
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
|
@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import login, room
|
from synapse.rest.client import login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class PaginationTestCase(HomeserverTestCase):
|
class PaginationTestCase(HomeserverTestCase):
|
||||||
"""
|
"""
|
||||||
|
@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
|
||||||
}
|
}
|
||||||
chunk = self._filter_messages(filter)
|
chunk = self._filter_messages(filter)
|
||||||
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
|
self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
|
||||||
|
|
||||||
|
|
||||||
|
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
|
||||||
|
"""
|
||||||
|
Test `get_last_event_in_room_before_stream_ordering(...)`
|
||||||
|
"""
|
||||||
|
|
||||||
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
self.event_sources = hs.get_event_sources()
|
||||||
|
|
||||||
|
def _update_persisted_instance_name_for_event(
|
||||||
|
self, event_id: str, instance_name: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Update the `instance_name` that persisted the the event in the database.
|
||||||
|
"""
|
||||||
|
return self.get_success(
|
||||||
|
self.store.db_pool.simple_update_one(
|
||||||
|
"events",
|
||||||
|
keyvalues={"event_id": event_id},
|
||||||
|
updatevalues={"instance_name": instance_name},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _send_event_on_instance(
|
||||||
|
self, instance_name: str, room_id: str, access_token: str
|
||||||
|
) -> Tuple[JsonDict, PersistedEventPosition]:
|
||||||
|
"""
|
||||||
|
Send an event in a room and mimic that it was persisted by a specific
|
||||||
|
instance/worker.
|
||||||
|
"""
|
||||||
|
event_response = self.helper.send(
|
||||||
|
room_id, f"{instance_name} message", tok=access_token
|
||||||
|
)
|
||||||
|
|
||||||
|
self._update_persisted_instance_name_for_event(
|
||||||
|
event_response["event_id"], instance_name
|
||||||
|
)
|
||||||
|
|
||||||
|
event_pos = self.get_success(
|
||||||
|
self.store.get_position_for_event(event_response["event_id"])
|
||||||
|
)
|
||||||
|
|
||||||
|
return event_response, event_pos
|
||||||
|
|
||||||
|
def test_before_room_created(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that no event is returned if we are using a token before the room was even created
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
before_room_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id,
|
||||||
|
end_token=before_room_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNone(last_event)
|
||||||
|
|
||||||
|
def test_after_room_created(self) -> None:
|
||||||
|
"""
|
||||||
|
Test that an event is returned if we are using a token after the room was created
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
|
||||||
|
after_room_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id,
|
||||||
|
end_token=after_room_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsNotNone(last_event)
|
||||||
|
|
||||||
|
def test_activity_in_other_rooms(self) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure that the last event in the room is returned even if the
|
||||||
|
`stream_ordering` has advanced from activity in other rooms.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
|
||||||
|
# Create another room to advance the stream_ordering
|
||||||
|
self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
|
||||||
|
after_room_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id1,
|
||||||
|
end_token=after_room_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure it's the event we expect (which also means we know it's from the
|
||||||
|
# correct room)
|
||||||
|
self.assertEqual(last_event, event_response["event_id"])
|
||||||
|
|
||||||
|
def test_activity_after_token_has_no_effect(self) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure we return the last event before the token even if there is
|
||||||
|
activity after it.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
|
||||||
|
|
||||||
|
after_room_token = self.event_sources.get_current_token()
|
||||||
|
|
||||||
|
# Send some events after the token
|
||||||
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
||||||
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id1,
|
||||||
|
end_token=after_room_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure it's the last event before the token
|
||||||
|
self.assertEqual(last_event, event_response["event_id"])
|
||||||
|
|
||||||
|
def test_last_event_within_sharded_token(self) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure we can find the last event that that is *within* the sharded
|
||||||
|
token (a token that has an `instance_map` and looks like
|
||||||
|
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
|
||||||
|
that we can find an event within the tokens minimum and instance
|
||||||
|
`stream_ordering`.
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response1, event_pos1 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id1, user1_tok
|
||||||
|
)
|
||||||
|
event_response2, event_pos2 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id1, user1_tok
|
||||||
|
)
|
||||||
|
event_response3, event_pos3 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id1, user1_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create another room to advance the `stream_ordering` on the same worker
|
||||||
|
# so we can sandwich event3 in the middle of the token
|
||||||
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response4, event_pos4 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id2, user1_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assemble a token that encompasses event1 -> event4 on worker1
|
||||||
|
end_token = RoomStreamToken(
|
||||||
|
stream=event_pos2.stream,
|
||||||
|
instance_map=immutabledict({"worker1": event_pos4.stream}),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send some events after the token
|
||||||
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
||||||
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id1,
|
||||||
|
end_token=end_token,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should find closest event at/before the token in room1
|
||||||
|
self.assertEqual(
|
||||||
|
last_event,
|
||||||
|
event_response3["event_id"],
|
||||||
|
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
|
||||||
|
+ str(
|
||||||
|
{
|
||||||
|
"event1": event_response1["event_id"],
|
||||||
|
"event2": event_response2["event_id"],
|
||||||
|
"event3": event_response3["event_id"],
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_last_event_before_sharded_token(self) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure we can find the last event that is *before* the sharded token
|
||||||
|
(a token that has an `instance_map` and looks like
|
||||||
|
`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
|
||||||
|
"""
|
||||||
|
user1_id = self.register_user("user1", "pass")
|
||||||
|
user1_tok = self.login(user1_id, "pass")
|
||||||
|
|
||||||
|
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response1, event_pos1 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id1, user1_tok
|
||||||
|
)
|
||||||
|
event_response2, event_pos2 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id1, user1_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create another room to advance the `stream_ordering` on the same worker
|
||||||
|
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||||
|
event_response3, event_pos3 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id2, user1_tok
|
||||||
|
)
|
||||||
|
event_response4, event_pos4 = self._send_event_on_instance(
|
||||||
|
"worker1", room_id2, user1_tok
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assemble a token that encompasses event3 -> event4 on worker1
|
||||||
|
end_token = RoomStreamToken(
|
||||||
|
stream=event_pos3.stream,
|
||||||
|
instance_map=immutabledict({"worker1": event_pos4.stream}),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send some events after the token
|
||||||
|
self.helper.send(room_id1, "after1", tok=user1_tok)
|
||||||
|
self.helper.send(room_id1, "after2", tok=user1_tok)
|
||||||
|
|
||||||
|
last_event = self.get_success(
|
||||||
|
self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id=room_id1,
|
||||||
|
end_token=end_token,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should find closest event at/before the token in room1
|
||||||
|
self.assertEqual(
|
||||||
|
last_event,
|
||||||
|
event_response2["event_id"],
|
||||||
|
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
|
||||||
|
+ str(
|
||||||
|
{
|
||||||
|
"event1": event_response1["event_id"],
|
||||||
|
"event2": event_response2["event_id"],
|
||||||
|
}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue