forked from MirrorHub/synapse
Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] (#14545)
This commit is contained in:
parent
24a97b3e71
commit
fb60cb16fe
8 changed files with 204 additions and 10 deletions
1
changelog.d/14545.misc
Normal file
1
changelog.d/14545.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Faster remote room joins: stream the un-partial-stating of events over replication.
|
|
@ -610,6 +610,8 @@ class FederationEventHandler:
|
||||||
self._state_storage_controller.notify_event_un_partial_stated(
|
self._state_storage_controller.notify_event_un_partial_stated(
|
||||||
event.event_id
|
event.event_id
|
||||||
)
|
)
|
||||||
|
# Notify that there's a new row in the un_partial_stated_events stream.
|
||||||
|
self._notifier.notify_replication()
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def backfill(
|
async def backfill(
|
||||||
|
|
|
@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import (
|
||||||
)
|
)
|
||||||
from synapse.replication.tcp.streams.events import EventsStream
|
from synapse.replication.tcp.streams.events import EventsStream
|
||||||
from synapse.replication.tcp.streams.federation import FederationStream
|
from synapse.replication.tcp.streams.federation import FederationStream
|
||||||
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
|
from synapse.replication.tcp.streams.partial_state import (
|
||||||
|
UnPartialStatedEventStream,
|
||||||
|
UnPartialStatedRoomStream,
|
||||||
|
)
|
||||||
|
|
||||||
STREAMS_MAP = {
|
STREAMS_MAP = {
|
||||||
stream.NAME: stream
|
stream.NAME: stream
|
||||||
|
@ -63,6 +66,7 @@ STREAMS_MAP = {
|
||||||
AccountDataStream,
|
AccountDataStream,
|
||||||
UserSignatureStream,
|
UserSignatureStream,
|
||||||
UnPartialStatedRoomStream,
|
UnPartialStatedRoomStream,
|
||||||
|
UnPartialStatedEventStream,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,4 +87,5 @@ __all__ = [
|
||||||
"AccountDataStream",
|
"AccountDataStream",
|
||||||
"UserSignatureStream",
|
"UserSignatureStream",
|
||||||
"UnPartialStatedRoomStream",
|
"UnPartialStatedRoomStream",
|
||||||
|
"UnPartialStatedEventStream",
|
||||||
]
|
]
|
||||||
|
|
|
@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream):
|
||||||
current_token_without_instance(store.get_un_partial_stated_rooms_token),
|
current_token_without_instance(store.get_un_partial_stated_rooms_token),
|
||||||
store.get_un_partial_stated_rooms_from_stream,
|
store.get_un_partial_stated_rooms_from_stream,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class UnPartialStatedEventStreamRow:
|
||||||
|
# ID of the event that has been un-partial-stated.
|
||||||
|
event_id: str
|
||||||
|
|
||||||
|
# True iff the rejection status of the event changed as a result of being
|
||||||
|
# un-partial-stated.
|
||||||
|
rejection_status_changed: bool
|
||||||
|
|
||||||
|
|
||||||
|
class UnPartialStatedEventStream(Stream):
|
||||||
|
"""
|
||||||
|
Stream to notify about events becoming un-partial-stated.
|
||||||
|
"""
|
||||||
|
|
||||||
|
NAME = "un_partial_stated_event"
|
||||||
|
ROW_TYPE = UnPartialStatedEventStreamRow
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
store = hs.get_datastores().main
|
||||||
|
super().__init__(
|
||||||
|
hs.get_instance_name(),
|
||||||
|
# TODO(faster_joins, multiple writers): we need to account for instance names
|
||||||
|
current_token_without_instance(store.get_un_partial_stated_events_token),
|
||||||
|
store.get_un_partial_stated_events_from_stream,
|
||||||
|
)
|
||||||
|
|
|
@ -70,6 +70,7 @@ from synapse.storage.database import (
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
from synapse.storage.types import Cursor
|
from synapse.storage.types import Cursor
|
||||||
from synapse.storage.util.id_generators import (
|
from synapse.storage.util.id_generators import (
|
||||||
|
AbstractStreamIdGenerator,
|
||||||
AbstractStreamIdTracker,
|
AbstractStreamIdTracker,
|
||||||
MultiWriterIdGenerator,
|
MultiWriterIdGenerator,
|
||||||
StreamIdGenerator,
|
StreamIdGenerator,
|
||||||
|
@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
id_column="chain_id",
|
id_column="chain_id",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
|
||||||
|
|
||||||
|
if isinstance(database.engine, PostgresEngine):
|
||||||
|
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
|
||||||
|
db_conn=db_conn,
|
||||||
|
db=database,
|
||||||
|
stream_name="un_partial_stated_event_stream",
|
||||||
|
instance_name=hs.get_instance_name(),
|
||||||
|
tables=[
|
||||||
|
("un_partial_stated_event_stream", "instance_name", "stream_id")
|
||||||
|
],
|
||||||
|
sequence_name="un_partial_stated_event_stream_sequence",
|
||||||
|
# TODO(faster_joins, multiple writers) Support multiple writers.
|
||||||
|
writers=["master"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
|
||||||
|
db_conn, "un_partial_stated_event_stream", "stream_id"
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_un_partial_stated_events_token(self) -> int:
|
||||||
|
# TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
|
||||||
|
# writers because workers that don't write often will hold all
|
||||||
|
# readers up.
|
||||||
|
return self._un_partial_stated_events_stream_id_gen.get_current_token()
|
||||||
|
|
||||||
|
async def get_un_partial_stated_events_from_stream(
|
||||||
|
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||||
|
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
|
||||||
|
"""Get updates for the un-partial-stated events replication stream.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance_name: The writer we want to fetch updates from. Unused
|
||||||
|
here since there is only ever one writer.
|
||||||
|
last_id: The token to fetch updates from. Exclusive.
|
||||||
|
current_id: The token to fetch updates up to. Inclusive.
|
||||||
|
limit: The requested limit for the number of rows to return. The
|
||||||
|
function may return more or fewer rows.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A tuple consisting of: the updates, a token to use to fetch
|
||||||
|
subsequent updates, and whether we returned fewer rows than exists
|
||||||
|
between the requested tokens due to the limit.
|
||||||
|
|
||||||
|
The token returned can be used in a subsequent call to this
|
||||||
|
function to get further updatees.
|
||||||
|
|
||||||
|
The updates are a list of 2-tuples of stream ID and the row data
|
||||||
|
"""
|
||||||
|
|
||||||
|
if last_id == current_id:
|
||||||
|
return [], current_id, False
|
||||||
|
|
||||||
|
def get_un_partial_stated_events_from_stream_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
|
||||||
|
sql = """
|
||||||
|
SELECT stream_id, event_id, rejection_status_changed
|
||||||
|
FROM un_partial_stated_event_stream
|
||||||
|
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
|
||||||
|
ORDER BY stream_id ASC
|
||||||
|
LIMIT ?
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||||
|
updates = [
|
||||||
|
(
|
||||||
|
row[0],
|
||||||
|
(
|
||||||
|
row[1],
|
||||||
|
bool(row[2]),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for row in txn
|
||||||
|
]
|
||||||
|
limited = False
|
||||||
|
upto_token = current_id
|
||||||
|
if len(updates) >= limit:
|
||||||
|
upto_token = updates[-1][0]
|
||||||
|
limited = True
|
||||||
|
|
||||||
|
return updates, upto_token, limited
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"get_un_partial_stated_events_from_stream",
|
||||||
|
get_un_partial_stated_events_from_stream_txn,
|
||||||
|
)
|
||||||
|
|
||||||
def process_replication_rows(
|
def process_replication_rows(
|
||||||
self,
|
self,
|
||||||
stream_name: str,
|
stream_name: str,
|
||||||
|
|
|
@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
hs: "HomeServer",
|
hs: "HomeServer",
|
||||||
):
|
):
|
||||||
super().__init__(database, db_conn, hs)
|
super().__init__(database, db_conn, hs)
|
||||||
|
self._instance_name: str = hs.get_instance_name()
|
||||||
|
|
||||||
async def get_room_version(self, room_id: str) -> RoomVersion:
|
async def get_room_version(self, room_id: str) -> RoomVersion:
|
||||||
"""Get the room_version of a given room
|
"""Get the room_version of a given room
|
||||||
|
@ -404,11 +405,13 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Update the state group for a partial state event"""
|
"""Update the state group for a partial state event"""
|
||||||
|
async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"update_state_for_partial_state_event",
|
"update_state_for_partial_state_event",
|
||||||
self._update_state_for_partial_state_event_txn,
|
self._update_state_for_partial_state_event_txn,
|
||||||
event,
|
event,
|
||||||
context,
|
context,
|
||||||
|
un_partial_state_event_stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _update_state_for_partial_state_event_txn(
|
def _update_state_for_partial_state_event_txn(
|
||||||
|
@ -416,6 +419,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
txn: LoggingTransaction,
|
txn: LoggingTransaction,
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
|
un_partial_state_event_stream_id: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
# we shouldn't have any outliers here
|
# we shouldn't have any outliers here
|
||||||
assert not event.internal_metadata.is_outlier()
|
assert not event.internal_metadata.is_outlier()
|
||||||
|
@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
# the event may now be rejected where it was not before, or vice versa,
|
# the event may now be rejected where it was not before, or vice versa,
|
||||||
# in which case we need to update the rejected flags.
|
# in which case we need to update the rejected flags.
|
||||||
if bool(context.rejected) != (event.rejected_reason is not None):
|
rejection_status_changed = bool(context.rejected) != (
|
||||||
|
event.rejected_reason is not None
|
||||||
|
)
|
||||||
|
if rejection_status_changed:
|
||||||
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
|
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
|
||||||
|
|
||||||
self.db_pool.simple_delete_one_txn(
|
self.db_pool.simple_delete_one_txn(
|
||||||
|
@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
keyvalues={"event_id": event.event_id},
|
keyvalues={"event_id": event.event_id},
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO(faster_joins): need to do something about workers here
|
|
||||||
# https://github.com/matrix-org/synapse/issues/12994
|
|
||||||
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
|
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self._get_state_group_for_event.prefill,
|
self._get_state_group_for_event.prefill,
|
||||||
|
@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
state_group,
|
state_group,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.db_pool.simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
"un_partial_stated_event_stream",
|
||||||
|
{
|
||||||
|
"stream_id": un_partial_state_event_stream_id,
|
||||||
|
"instance_name": self._instance_name,
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"rejection_status_changed": rejection_status_changed,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Stream for notifying that an event has become un-partial-stated.
|
||||||
|
CREATE TABLE un_partial_stated_event_stream(
|
||||||
|
-- Position in the stream
|
||||||
|
stream_id BIGINT PRIMARY KEY NOT NULL,
|
||||||
|
|
||||||
|
-- Which instance wrote this entry.
|
||||||
|
instance_name TEXT NOT NULL,
|
||||||
|
|
||||||
|
-- Which event has been un-partial-stated.
|
||||||
|
event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE,
|
||||||
|
|
||||||
|
-- true iff the `rejected` status of the event changed when it became
|
||||||
|
-- un-partial-stated.
|
||||||
|
rejection_status_changed BOOLEAN NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- We want an index here because of the foreign key constraint:
|
||||||
|
-- upon deleting an event, the database needs to be able to check here.
|
||||||
|
CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id);
|
|
@ -0,0 +1,20 @@
|
||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence;
|
||||||
|
|
||||||
|
SELECT setval('un_partial_stated_event_stream_sequence', (
|
||||||
|
SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream
|
||||||
|
));
|
Loading…
Reference in a new issue