Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
This commit is contained in:
commit
7eea8de9de
17
CHANGES.md
17
CHANGES.md
|
@ -1,3 +1,20 @@
|
|||
Synapse 1.37.1 (2021-06-30)
|
||||
===========================
|
||||
|
||||
This release resolves issues (such as [#9490](https://github.com/matrix-org/synapse/issues/9490)) where one busy room could cause head-of-line blocking, starving Synapse from processing events in other rooms, and causing all federated traffic to fall behind. Synapse 1.37.1 processes inbound federation traffic asynchronously, ensuring that one busy room won't impact others. Please upgrade to Synapse 1.37.1 as soon as possible, in order to increase resilience to other traffic spikes.
|
||||
|
||||
No significant changes since v1.37.1rc1.
|
||||
|
||||
|
||||
Synapse 1.37.1rc1 (2021-06-29)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Handle inbound events from federation asynchronously. ([\#10269](https://github.com/matrix-org/synapse/issues/10269), [\#10272](https://github.com/matrix-org/synapse/issues/10272))
|
||||
|
||||
|
||||
Synapse 1.37.0 (2021-06-29)
|
||||
===========================
|
||||
|
||||
|
|
1
changelog.d/10253.misc
Normal file
1
changelog.d/10253.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix type hints for computing auth events.
|
1
changelog.d/10256.misc
Normal file
1
changelog.d/10256.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve the performance of the spaces summary endpoint by only recursing into spaces (and not rooms in general).
|
|
@ -1 +0,0 @@
|
|||
Handle inbound events from federation asynchronously.
|
|
@ -1 +0,0 @@
|
|||
Handle inbound events from federation asynchronously.
|
1
changelog.d/10279.bugfix
Normal file
1
changelog.d/10279.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1.
|
1
changelog.d/10282.bugfix
Normal file
1
changelog.d/10282.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.
|
1
changelog.d/10286.bugfix
Normal file
1
changelog.d/10286.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.
|
1
changelog.d/10288.doc
Normal file
1
changelog.d/10288.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix homeserver config option name in presence router documentation.
|
6
debian/changelog
vendored
6
debian/changelog
vendored
|
@ -1,3 +1,9 @@
|
|||
matrix-synapse-py3 (1.37.1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.37.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 30 Jun 2021 12:24:06 +0100
|
||||
|
||||
matrix-synapse-py3 (1.37.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.37.0.
|
||||
|
|
|
@ -222,7 +222,9 @@ Synapse, amend your homeserver config file with the following.
|
|||
|
||||
```yaml
|
||||
presence:
|
||||
routing_module:
|
||||
enabled: true
|
||||
|
||||
presence_router:
|
||||
module: my_module.ExamplePresenceRouter
|
||||
config:
|
||||
# Any configuration options for your module. The below is an example.
|
||||
|
|
|
@ -47,7 +47,7 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.37.1a1"
|
||||
__version__ = "1.37.1"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import pymacaroons
|
||||
from netaddr import IPAddress
|
||||
|
@ -31,6 +31,7 @@ from synapse.api.errors import (
|
|||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.http import get_request_user_agent
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging import opentracing as opentracing
|
||||
|
@ -490,7 +491,7 @@ class Auth:
|
|||
|
||||
def compute_auth_events(
|
||||
self,
|
||||
event,
|
||||
event: Union[EventBase, EventBuilder],
|
||||
current_state_ids: StateMap[str],
|
||||
for_verification: bool = False,
|
||||
) -> List[str]:
|
||||
|
|
|
@ -201,6 +201,12 @@ class EventContentFields:
|
|||
)
|
||||
|
||||
|
||||
class RoomTypes:
|
||||
"""Understood values of the room_type field of m.room.create events."""
|
||||
|
||||
SPACE = "m.space"
|
||||
|
||||
|
||||
class RoomEncryptionAlgorithms:
|
||||
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
|
||||
DEFAULT = MEGOLM_V1_AES_SHA2
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
|
@ -29,6 +29,7 @@ from synapse.api.room_versions import (
|
|||
RoomVersion,
|
||||
)
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.types import StateMap, UserID, get_domain_from_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -724,7 +725,7 @@ def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
|
|||
return public_keys
|
||||
|
||||
|
||||
def auth_types_for_event(event: EventBase) -> Set[Tuple[str, str]]:
|
||||
def auth_types_for_event(event: Union[EventBase, EventBuilder]) -> Set[Tuple[str, str]]:
|
||||
"""Given an event, return a list of (EventType, StateKey) that may be
|
||||
needed to auth the event. The returned list may be a superset of what
|
||||
would actually be required depending on the full state of the room.
|
||||
|
|
|
@ -118,7 +118,7 @@ class _EventInternalMetadata:
|
|||
proactively_send = DictProperty("proactively_send") # type: bool
|
||||
redacted = DictProperty("redacted") # type: bool
|
||||
txn_id = DictProperty("txn_id") # type: str
|
||||
token_id = DictProperty("token_id") # type: str
|
||||
token_id = DictProperty("token_id") # type: int
|
||||
historical = DictProperty("historical") # type: bool
|
||||
|
||||
# XXX: These are set by StreamWorkerStore._set_before_and_after.
|
||||
|
|
|
@ -12,12 +12,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from nacl.signing import SigningKey
|
||||
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.api.constants import MAX_DEPTH
|
||||
from synapse.api.errors import UnsupportedRoomVersionError
|
||||
from synapse.api.room_versions import (
|
||||
|
@ -34,10 +33,14 @@ from synapse.types import EventID, JsonDict
|
|||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s(slots=True, cmp=False, frozen=True)
|
||||
@attr.s(slots=True, cmp=False, frozen=True, auto_attribs=True)
|
||||
class EventBuilder:
|
||||
"""A format independent event builder used to build up the event content
|
||||
before signing the event.
|
||||
|
@ -62,31 +65,30 @@ class EventBuilder:
|
|||
_signing_key: The signing key to use to sign the event as the server
|
||||
"""
|
||||
|
||||
_state = attr.ib(type=StateHandler)
|
||||
_auth = attr.ib(type=Auth)
|
||||
_store = attr.ib(type=DataStore)
|
||||
_clock = attr.ib(type=Clock)
|
||||
_hostname = attr.ib(type=str)
|
||||
_signing_key = attr.ib(type=SigningKey)
|
||||
_state: StateHandler
|
||||
_auth: "Auth"
|
||||
_store: DataStore
|
||||
_clock: Clock
|
||||
_hostname: str
|
||||
_signing_key: SigningKey
|
||||
|
||||
room_version = attr.ib(type=RoomVersion)
|
||||
room_version: RoomVersion
|
||||
|
||||
room_id = attr.ib(type=str)
|
||||
type = attr.ib(type=str)
|
||||
sender = attr.ib(type=str)
|
||||
room_id: str
|
||||
type: str
|
||||
sender: str
|
||||
|
||||
content = attr.ib(default=attr.Factory(dict), type=JsonDict)
|
||||
unsigned = attr.ib(default=attr.Factory(dict), type=JsonDict)
|
||||
content: JsonDict = attr.Factory(dict)
|
||||
unsigned: JsonDict = attr.Factory(dict)
|
||||
|
||||
# These only exist on a subset of events, so they raise AttributeError if
|
||||
# someone tries to get them when they don't exist.
|
||||
_state_key = attr.ib(default=None, type=Optional[str])
|
||||
_redacts = attr.ib(default=None, type=Optional[str])
|
||||
_origin_server_ts = attr.ib(default=None, type=Optional[int])
|
||||
_state_key: Optional[str] = None
|
||||
_redacts: Optional[str] = None
|
||||
_origin_server_ts: Optional[int] = None
|
||||
|
||||
internal_metadata = attr.ib(
|
||||
default=attr.Factory(lambda: _EventInternalMetadata({})),
|
||||
type=_EventInternalMetadata,
|
||||
internal_metadata: _EventInternalMetadata = attr.Factory(
|
||||
lambda: _EventInternalMetadata({})
|
||||
)
|
||||
|
||||
@property
|
||||
|
@ -184,7 +186,7 @@ class EventBuilder:
|
|||
|
||||
|
||||
class EventBuilderFactory:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.hostname = hs.hostname
|
||||
self.signing_key = hs.signing_key
|
||||
|
@ -193,15 +195,14 @@ class EventBuilderFactory:
|
|||
self.state = hs.get_state_handler()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
def new(self, room_version, key_values):
|
||||
def new(self, room_version: str, key_values: dict) -> EventBuilder:
|
||||
"""Generate an event builder appropriate for the given room version
|
||||
|
||||
Deprecated: use for_room_version with a RoomVersion object instead
|
||||
|
||||
Args:
|
||||
room_version (str): Version of the room that we're creating an event builder
|
||||
for
|
||||
key_values (dict): Fields used as the basis of the new event
|
||||
room_version: Version of the room that we're creating an event builder for
|
||||
key_values: Fields used as the basis of the new event
|
||||
|
||||
Returns:
|
||||
EventBuilder
|
||||
|
@ -212,13 +213,15 @@ class EventBuilderFactory:
|
|||
raise UnsupportedRoomVersionError()
|
||||
return self.for_room_version(v, key_values)
|
||||
|
||||
def for_room_version(self, room_version, key_values):
|
||||
def for_room_version(
|
||||
self, room_version: RoomVersion, key_values: dict
|
||||
) -> EventBuilder:
|
||||
"""Generate an event builder appropriate for the given room version
|
||||
|
||||
Args:
|
||||
room_version (synapse.api.room_versions.RoomVersion):
|
||||
room_version:
|
||||
Version of the room that we're creating an event builder for
|
||||
key_values (dict): Fields used as the basis of the new event
|
||||
key_values: Fields used as the basis of the new event
|
||||
|
||||
Returns:
|
||||
EventBuilder
|
||||
|
@ -286,15 +289,15 @@ def create_local_event_from_event_dict(
|
|||
_event_id_counter = 0
|
||||
|
||||
|
||||
def _create_event_id(clock, hostname):
|
||||
def _create_event_id(clock: Clock, hostname: str) -> str:
|
||||
"""Create a new event ID
|
||||
|
||||
Args:
|
||||
clock (Clock)
|
||||
hostname (str): The server name for the event ID
|
||||
clock
|
||||
hostname: The server name for the event ID
|
||||
|
||||
Returns:
|
||||
str
|
||||
The new event ID
|
||||
"""
|
||||
|
||||
global _event_id_counter
|
||||
|
|
|
@ -369,22 +369,21 @@ class FederationServer(FederationBase):
|
|||
|
||||
async def process_pdu(pdu: EventBase) -> JsonDict:
|
||||
event_id = pdu.event_id
|
||||
with pdu_process_time.time():
|
||||
with nested_logging_context(event_id):
|
||||
try:
|
||||
await self._handle_received_pdu(origin, pdu)
|
||||
return {}
|
||||
except FederationError as e:
|
||||
logger.warning("Error handling PDU %s: %s", event_id, e)
|
||||
return {"error": str(e)}
|
||||
except Exception as e:
|
||||
f = failure.Failure()
|
||||
logger.error(
|
||||
"Failed to handle PDU %s",
|
||||
event_id,
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
|
||||
)
|
||||
return {"error": str(e)}
|
||||
with nested_logging_context(event_id):
|
||||
try:
|
||||
await self._handle_received_pdu(origin, pdu)
|
||||
return {}
|
||||
except FederationError as e:
|
||||
logger.warning("Error handling PDU %s: %s", event_id, e)
|
||||
return {"error": str(e)}
|
||||
except Exception as e:
|
||||
f = failure.Failure()
|
||||
logger.error(
|
||||
"Failed to handle PDU %s",
|
||||
event_id,
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
|
||||
)
|
||||
return {"error": str(e)}
|
||||
|
||||
await concurrently_execute(
|
||||
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
|
||||
|
@ -932,9 +931,13 @@ class FederationServer(FederationBase):
|
|||
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
|
||||
)
|
||||
|
||||
await self.store.remove_received_event_from_staging(
|
||||
received_ts = await self.store.remove_received_event_from_staging(
|
||||
origin, event.event_id
|
||||
)
|
||||
if received_ts is not None:
|
||||
pdu_process_time.observe(
|
||||
(self._clock.time_msec() - received_ts) / 1000
|
||||
)
|
||||
|
||||
# We need to do this check outside the lock to avoid a race between
|
||||
# a new event being inserted by another instance and it attempting
|
||||
|
|
|
@ -509,6 +509,8 @@ class EventCreationHandler:
|
|||
Should normally be left as None, which will cause them to be calculated
|
||||
based on the room state at the prev_events.
|
||||
|
||||
If non-None, prev_event_ids must also be provided.
|
||||
|
||||
require_consent: Whether to check if the requester has
|
||||
consented to the privacy policy.
|
||||
|
||||
|
@ -581,6 +583,9 @@ class EventCreationHandler:
|
|||
# Strip down the auth_event_ids to only what we need to auth the event.
|
||||
# For example, we don't need extra m.room.member that don't match event.sender
|
||||
if auth_event_ids is not None:
|
||||
# If auth events are provided, prev events must be also.
|
||||
assert prev_event_ids is not None
|
||||
|
||||
temp_event = await builder.build(
|
||||
prev_event_ids=prev_event_ids,
|
||||
auth_event_ids=auth_event_ids,
|
||||
|
@ -784,6 +789,8 @@ class EventCreationHandler:
|
|||
The event ids to use as the auth_events for the new event.
|
||||
Should normally be left as None, which will cause them to be calculated
|
||||
based on the room state at the prev_events.
|
||||
|
||||
If non-None, prev_event_ids must also be provided.
|
||||
ratelimit: Whether to rate limit this send.
|
||||
txn_id: The transaction ID.
|
||||
ignore_shadow_ban: True if shadow-banned users should be allowed to
|
||||
|
|
|
@ -25,6 +25,7 @@ from synapse.api.constants import (
|
|||
EventTypes,
|
||||
HistoryVisibility,
|
||||
Membership,
|
||||
RoomTypes,
|
||||
)
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import format_event_for_client_v2
|
||||
|
@ -318,7 +319,8 @@ class SpaceSummaryHandler:
|
|||
|
||||
Returns:
|
||||
A tuple of:
|
||||
An iterable of a single value of the room.
|
||||
The room information, if the room should be returned to the
|
||||
user. None, otherwise.
|
||||
|
||||
An iterable of the sorted children events. This may be limited
|
||||
to a maximum size or may include all children.
|
||||
|
@ -328,7 +330,11 @@ class SpaceSummaryHandler:
|
|||
|
||||
room_entry = await self._build_room_entry(room_id)
|
||||
|
||||
# look for child rooms/spaces.
|
||||
# If the room is not a space, return just the room information.
|
||||
if room_entry.get("room_type") != RoomTypes.SPACE:
|
||||
return room_entry, ()
|
||||
|
||||
# Otherwise, look for child rooms/spaces.
|
||||
child_events = await self._get_child_events(room_id)
|
||||
|
||||
if suggested_only:
|
||||
|
@ -348,6 +354,7 @@ class SpaceSummaryHandler:
|
|||
event_format=format_event_for_client_v2,
|
||||
)
|
||||
)
|
||||
|
||||
return room_entry, events_result
|
||||
|
||||
async def _summarize_remote_room(
|
||||
|
|
|
@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
self,
|
||||
origin: str,
|
||||
event_id: str,
|
||||
) -> None:
|
||||
"""Remove the given event from the staging area"""
|
||||
await self.db_pool.simple_delete(
|
||||
table="federation_inbound_events_staging",
|
||||
keyvalues={
|
||||
"origin": origin,
|
||||
"event_id": event_id,
|
||||
},
|
||||
desc="remove_received_event_from_staging",
|
||||
)
|
||||
) -> Optional[int]:
|
||||
"""Remove the given event from the staging area.
|
||||
|
||||
Returns:
|
||||
The received_ts of the row that was deleted, if any.
|
||||
"""
|
||||
if self.db_pool.engine.supports_returning:
|
||||
|
||||
def _remove_received_event_from_staging_txn(txn):
|
||||
sql = """
|
||||
DELETE FROM federation_inbound_events_staging
|
||||
WHERE origin = ? AND event_id = ?
|
||||
RETURNING received_ts
|
||||
"""
|
||||
|
||||
txn.execute(sql, (origin, event_id))
|
||||
return txn.fetchone()
|
||||
|
||||
row = await self.db_pool.runInteraction(
|
||||
"remove_received_event_from_staging",
|
||||
_remove_received_event_from_staging_txn,
|
||||
db_autocommit=True,
|
||||
)
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return row[0]
|
||||
|
||||
else:
|
||||
|
||||
def _remove_received_event_from_staging_txn(txn):
|
||||
received_ts = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="federation_inbound_events_staging",
|
||||
keyvalues={
|
||||
"origin": origin,
|
||||
"event_id": event_id,
|
||||
},
|
||||
retcol="received_ts",
|
||||
allow_none=True,
|
||||
)
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="federation_inbound_events_staging",
|
||||
keyvalues={
|
||||
"origin": origin,
|
||||
"event_id": event_id,
|
||||
},
|
||||
)
|
||||
|
||||
return received_ts
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"remove_received_event_from_staging",
|
||||
_remove_received_event_from_staging_txn,
|
||||
)
|
||||
|
||||
async def get_next_staged_event_id_for_room(
|
||||
self,
|
||||
|
|
|
@ -29,13 +29,18 @@ from synapse.types import JsonDict
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
|
||||
_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
|
||||
# there should be no leftover rows without a stream_ordering2, but just in case...
|
||||
"UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
|
||||
# finally, we can drop the rule and switch the columns
|
||||
# now we can drop the rule and switch the columns
|
||||
"DROP RULE populate_stream_ordering2 ON events",
|
||||
"ALTER TABLE events DROP COLUMN stream_ordering",
|
||||
"ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
|
||||
# ... and finally, rename the indexes into place for consistency with sqlite
|
||||
"ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
|
||||
"ALTER INDEX events_order_room2 RENAME TO events_order_room",
|
||||
"ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
|
||||
"ALTER INDEX events_ts2 RENAME TO events_ts",
|
||||
)
|
||||
|
||||
|
||||
|
@ -45,6 +50,10 @@ class _BackgroundUpdates:
|
|||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||
POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
|
||||
INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
|
||||
INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
|
||||
INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
|
||||
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
|
||||
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
|
||||
|
||||
|
||||
|
@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
self._purged_chain_cover_index,
|
||||
)
|
||||
|
||||
################################################################################
|
||||
|
||||
# bg updates for replacing stream_ordering with a BIGINT
|
||||
# (these only run on postgres.)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
|
||||
self._background_populate_stream_ordering2,
|
||||
)
|
||||
# CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
_BackgroundUpdates.INDEX_STREAM_ORDERING2,
|
||||
index_name="events_stream_ordering",
|
||||
|
@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
columns=["stream_ordering2"],
|
||||
unique=True,
|
||||
)
|
||||
# CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
_BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
|
||||
index_name="event_contains_url_index2",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering2"],
|
||||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
# CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
_BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
|
||||
index_name="events_order_room2",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering2"],
|
||||
)
|
||||
# CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
_BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
|
||||
index_name="events_room_stream2",
|
||||
table="events",
|
||||
columns=["room_id", "stream_ordering2"],
|
||||
)
|
||||
# CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
_BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
|
||||
index_name="events_ts2",
|
||||
table="events",
|
||||
columns=["origin_server_ts", "stream_ordering2"],
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
|
||||
self._background_replace_stream_ordering_column,
|
||||
)
|
||||
|
||||
################################################################################
|
||||
|
||||
async def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
|
@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
|||
"""Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
|
||||
|
||||
def process(txn: Cursor) -> None:
|
||||
for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
|
||||
for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
|
||||
logger.info("completing stream_ordering migration: %s", sql)
|
||||
txn.execute(sql)
|
||||
|
||||
|
|
|
@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
|
|||
"""
|
||||
...
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def supports_returning(self) -> bool:
|
||||
"""Do we support the `RETURNING` clause in insert/update/delete?"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def check_database(
|
||||
self, db_conn: ConnectionType, allow_outdated_version: bool = False
|
||||
|
|
|
@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
|
|||
"""Do we support using `a = ANY(?)` and passing a list"""
|
||||
return True
|
||||
|
||||
@property
|
||||
def supports_returning(self) -> bool:
|
||||
"""Do we support the `RETURNING` clause in insert/update/delete?"""
|
||||
return True
|
||||
|
||||
def is_deadlock(self, error):
|
||||
if isinstance(error, self.module.DatabaseError):
|
||||
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
|
||||
|
|
|
@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
|
|||
"""Do we support using `a = ANY(?)` and passing a list"""
|
||||
return False
|
||||
|
||||
@property
|
||||
def supports_returning(self) -> bool:
|
||||
"""Do we support the `RETURNING` clause in insert/update/delete?"""
|
||||
return self.module.sqlite_version_info >= (3, 35, 0)
|
||||
|
||||
def check_database(self, db_conn, allow_outdated_version: bool = False):
|
||||
if not allow_outdated_version:
|
||||
version = self.module.sqlite_version_info
|
||||
|
|
|
@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS
|
|||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(6001, 'populate_stream_ordering2', '{}');
|
||||
|
||||
-- ... and another to build an index on it
|
||||
-- ... and some more to build indexes on it. These aren't really interdependent
|
||||
-- but the backround_updates manager can only handle a single dependency per update.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
|
||||
(6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'),
|
||||
(6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'),
|
||||
(6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'),
|
||||
(6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'),
|
||||
(6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream');
|
||||
|
||||
-- ... and another to do the switcheroo
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
|
||||
(6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');
|
||||
(6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts');
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'.
|
||||
--
|
||||
-- It updates the other tables which use an INTEGER to refer to a stream ordering.
|
||||
-- These tables are all small enough that a re-create is tractable.
|
||||
ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT;
|
||||
ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT;
|
||||
|
||||
-- these aren't actually event stream orderings, but they are numbers where 2 billion
|
||||
-- is a bit limiting, application_services_state is tiny, and I don't want to ever have
|
||||
-- to do this again.
|
||||
ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT;
|
||||
ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT;
|
||||
ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT;
|
||||
|
||||
|
|
@ -14,6 +14,7 @@
|
|||
from typing import Any, Iterable, Optional, Tuple
|
||||
from unittest import mock
|
||||
|
||||
from synapse.api.constants import EventContentFields, RoomTypes
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.handlers.space_summary import _child_events_comparison_key
|
||||
from synapse.rest import admin
|
||||
|
@ -97,9 +98,21 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
|
|||
self.hs = hs
|
||||
self.handler = self.hs.get_space_summary_handler()
|
||||
|
||||
# Create a user.
|
||||
self.user = self.register_user("user", "pass")
|
||||
self.token = self.login("user", "pass")
|
||||
|
||||
# Create a space and a child room.
|
||||
self.space = self.helper.create_room_as(
|
||||
self.user,
|
||||
tok=self.token,
|
||||
extra_content={
|
||||
"creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
|
||||
},
|
||||
)
|
||||
self.room = self.helper.create_room_as(self.user, tok=self.token)
|
||||
self._add_child(self.space, self.room, self.token)
|
||||
|
||||
def _add_child(self, space_id: str, room_id: str, token: str) -> None:
|
||||
"""Add a child room to a space."""
|
||||
self.helper.send_state(
|
||||
|
@ -128,43 +141,32 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
def test_simple_space(self):
|
||||
"""Test a simple space with a single room."""
|
||||
space = self.helper.create_room_as(self.user, tok=self.token)
|
||||
room = self.helper.create_room_as(self.user, tok=self.token)
|
||||
self._add_child(space, room, self.token)
|
||||
|
||||
result = self.get_success(self.handler.get_space_summary(self.user, space))
|
||||
result = self.get_success(self.handler.get_space_summary(self.user, self.space))
|
||||
# The result should have the space and the room in it, along with a link
|
||||
# from space -> room.
|
||||
self._assert_rooms(result, [space, room])
|
||||
self._assert_events(result, [(space, room)])
|
||||
self._assert_rooms(result, [self.space, self.room])
|
||||
self._assert_events(result, [(self.space, self.room)])
|
||||
|
||||
def test_visibility(self):
|
||||
"""A user not in a space cannot inspect it."""
|
||||
space = self.helper.create_room_as(self.user, tok=self.token)
|
||||
room = self.helper.create_room_as(self.user, tok=self.token)
|
||||
self._add_child(space, room, self.token)
|
||||
|
||||
user2 = self.register_user("user2", "pass")
|
||||
token2 = self.login("user2", "pass")
|
||||
|
||||
# The user cannot see the space.
|
||||
self.get_failure(self.handler.get_space_summary(user2, space), AuthError)
|
||||
self.get_failure(self.handler.get_space_summary(user2, self.space), AuthError)
|
||||
|
||||
# Joining the room causes it to be visible.
|
||||
self.helper.join(space, user2, tok=token2)
|
||||
result = self.get_success(self.handler.get_space_summary(user2, space))
|
||||
self.helper.join(self.space, user2, tok=token2)
|
||||
result = self.get_success(self.handler.get_space_summary(user2, self.space))
|
||||
|
||||
# The result should only have the space, but includes the link to the room.
|
||||
self._assert_rooms(result, [space])
|
||||
self._assert_events(result, [(space, room)])
|
||||
self._assert_rooms(result, [self.space])
|
||||
self._assert_events(result, [(self.space, self.room)])
|
||||
|
||||
def test_world_readable(self):
|
||||
"""A world-readable room is visible to everyone."""
|
||||
space = self.helper.create_room_as(self.user, tok=self.token)
|
||||
room = self.helper.create_room_as(self.user, tok=self.token)
|
||||
self._add_child(space, room, self.token)
|
||||
self.helper.send_state(
|
||||
space,
|
||||
self.space,
|
||||
event_type="m.room.history_visibility",
|
||||
body={"history_visibility": "world_readable"},
|
||||
tok=self.token,
|
||||
|
@ -173,6 +175,6 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase):
|
|||
user2 = self.register_user("user2", "pass")
|
||||
|
||||
# The space should be visible, as well as the link to the room.
|
||||
result = self.get_success(self.handler.get_space_summary(user2, space))
|
||||
self._assert_rooms(result, [space])
|
||||
self._assert_events(result, [(space, room)])
|
||||
result = self.get_success(self.handler.get_space_summary(user2, self.space))
|
||||
self._assert_rooms(result, [self.space])
|
||||
self._assert_events(result, [(self.space, self.room)])
|
||||
|
|
|
@ -52,6 +52,7 @@ class RestHelper:
|
|||
room_version: str = None,
|
||||
tok: str = None,
|
||||
expect_code: int = 200,
|
||||
extra_content: Optional[Dict] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Create a room.
|
||||
|
@ -72,7 +73,7 @@ class RestHelper:
|
|||
temp_id = self.auth_user_id
|
||||
self.auth_user_id = room_creator
|
||||
path = "/_matrix/client/r0/createRoom"
|
||||
content = {}
|
||||
content = extra_content or {}
|
||||
if not is_public:
|
||||
content["visibility"] = "private"
|
||||
if room_version:
|
||||
|
|
Loading…
Reference in a new issue