Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
This commit is contained in:
commit
a8f48246b6
18
CHANGES.md
18
CHANGES.md
|
@ -1,3 +1,21 @@
|
|||
Synapse 1.33.0 (2021-05-05)
|
||||
===========================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))
|
||||
|
||||
|
||||
Synapse 1.33.0rc2 (2021-04-29)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))
|
||||
|
||||
|
||||
Synapse 1.33.0rc1 (2021-04-28)
|
||||
==============================
|
||||
|
||||
|
|
1
changelog.d/9881.feature
Normal file
1
changelog.d/9881.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add experimental option to track memory usage of the caches.
|
1
changelog.d/9885.misc
Normal file
1
changelog.d/9885.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type hints to presence handler.
|
1
changelog.d/9886.misc
Normal file
1
changelog.d/9886.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce memory usage of the LRU caches.
|
1
changelog.d/9889.feature
Normal file
1
changelog.d/9889.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`.
|
1
changelog.d/9889.removal
Normal file
1
changelog.d/9889.removal
Normal file
|
@ -0,0 +1 @@
|
|||
Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`.
|
1
changelog.d/9895.bugfix
Normal file
1
changelog.d/9895.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements.
|
1
changelog.d/9896.bugfix
Normal file
1
changelog.d/9896.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`.
|
1
changelog.d/9896.misc
Normal file
1
changelog.d/9896.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add type hints to the `synapse.handlers` module.
|
|
@ -1 +0,0 @@
|
|||
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
|
1
changelog.d/9902.feature
Normal file
1
changelog.d/9902.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set.
|
1
changelog.d/9904.misc
Normal file
1
changelog.d/9904.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Time response time for external cache requests.
|
1
changelog.d/9905.feature
Normal file
1
changelog.d/9905.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Improve performance of sending events for worker-based deployments using Redis.
|
1
changelog.d/9910.bugfix
Normal file
1
changelog.d/9910.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.
|
1
changelog.d/9910.feature
Normal file
1
changelog.d/9910.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Improve performance after joining a large room when presence is enabled.
|
1
changelog.d/9911.doc
Normal file
1
changelog.d/9911.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Add `port` argument to the Postgres database sample config section.
|
1
changelog.d/9913.docker
Normal file
1
changelog.d/9913.docker
Normal file
|
@ -0,0 +1 @@
|
|||
Added startup_delay to docker healthcheck to reduce waiting time for coming online, updated readme for extra options, contributed by @Maquis196.
|
1
changelog.d/9915.feature
Normal file
1
changelog.d/9915.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Support stable identifiers from [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772).
|
1
changelog.d/9916.feature
Normal file
1
changelog.d/9916.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Improve performance after joining a large room when presence is enabled.
|
1
changelog.d/9919.feature
Normal file
1
changelog.d/9919.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Omit empty fields from the `/sync` response. Contributed by @deepbluev7.
|
1
changelog.d/9928.bugfix
Normal file
1
changelog.d/9928.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Include the `origin_server_ts` property in the experimental [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) support to allow clients to properly sort rooms.
|
6
debian/changelog
vendored
6
debian/changelog
vendored
|
@ -1,3 +1,9 @@
|
|||
matrix-synapse-py3 (1.33.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.33.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 05 May 2021 14:15:27 +0100
|
||||
|
||||
matrix-synapse-py3 (1.32.2) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.32.2.
|
||||
|
|
|
@ -88,5 +88,5 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
|||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
|
||||
HEALTHCHECK --interval=1m --timeout=5s \
|
||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
|
|
|
@ -191,6 +191,16 @@ whilst running the above `docker run` commands.
|
|||
```
|
||||
--no-healthcheck
|
||||
```
|
||||
|
||||
## Disabling the healthcheck in docker-compose file
|
||||
|
||||
If you wish to disable the healthcheck via docker-compose, append the following to your service configuration.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
disable: true
|
||||
```
|
||||
|
||||
## Setting custom healthcheck on docker run
|
||||
|
||||
If you wish to point the healthcheck at a different port with docker command, add the following
|
||||
|
@ -202,14 +212,15 @@ If you wish to point the healthcheck at a different port with docker command, ad
|
|||
## Setting the healthcheck in docker-compose file
|
||||
|
||||
You can add the following to set a custom healthcheck in a docker compose file.
|
||||
You will need version >2.1 for this to work.
|
||||
You will need docker-compose version >2.1 for this to work.
|
||||
|
||||
```
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-fSs", "http://localhost:8008/health"]
|
||||
interval: 1m
|
||||
timeout: 10s
|
||||
interval: 15s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 5s
|
||||
```
|
||||
|
||||
## Using jemalloc
|
||||
|
|
|
@ -427,7 +427,7 @@ the new room. Users on other servers will be unaffected.
|
|||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/rooms/<room_id>/delete
|
||||
DELETE /_synapse/admin/v1/rooms/<room_id>
|
||||
```
|
||||
|
||||
with a body of:
|
||||
|
@ -528,6 +528,15 @@ You will have to manually handle, if you so choose, the following:
|
|||
* Users that would have been booted from the room (and will have been force-joined to the Content Violation room).
|
||||
* Removal of the Content Violation room if desired.
|
||||
|
||||
## Deprecated endpoint
|
||||
|
||||
The previous deprecated API will be removed in a future release, it was:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/rooms/<room_id>/delete
|
||||
```
|
||||
|
||||
It behaves the same way than the current endpoint except the path and the method.
|
||||
|
||||
# Make Room Admin API
|
||||
|
||||
|
|
|
@ -152,6 +152,16 @@ presence:
|
|||
#
|
||||
#gc_thresholds: [700, 10, 10]
|
||||
|
||||
# The minimum time in seconds between each GC for a generation, regardless of
|
||||
# the GC thresholds. This ensures that we don't do GC too frequently.
|
||||
#
|
||||
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
|
||||
# generation 0 GCs, etc.
|
||||
#
|
||||
# Defaults to `[1s, 10s, 30s]`.
|
||||
#
|
||||
#gc_min_interval: [0.5s, 30s, 1m]
|
||||
|
||||
# Set the limit on the returned events in the timeline in the get
|
||||
# and sync operations. The default value is 100. -1 means no upper limit.
|
||||
#
|
||||
|
@ -810,6 +820,7 @@ caches:
|
|||
# password: secretpassword
|
||||
# database: synapse
|
||||
# host: localhost
|
||||
# port: 5432
|
||||
# cp_min: 5
|
||||
# cp_max: 10
|
||||
#
|
||||
|
|
3
mypy.ini
3
mypy.ini
|
@ -171,3 +171,6 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-txacme.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-pympler.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -21,9 +21,10 @@ DISTS = (
|
|||
"debian:buster",
|
||||
"debian:bullseye",
|
||||
"debian:sid",
|
||||
"ubuntu:bionic",
|
||||
"ubuntu:focal",
|
||||
"ubuntu:groovy",
|
||||
"ubuntu:bionic", # 18.04 LTS (our EOL forced by Py36 on 2021-12-23)
|
||||
"ubuntu:focal", # 20.04 LTS (our EOL forced by Py38 on 2024-10-14)
|
||||
"ubuntu:groovy", # 20.10 (EOL 2021-07-07)
|
||||
"ubuntu:hirsute", # 21.04 (EOL 2022-01-05)
|
||||
)
|
||||
|
||||
DESC = '''\
|
||||
|
|
|
@ -47,7 +47,7 @@ try:
|
|||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.33.0rc1"
|
||||
__version__ = "1.33.0"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
|
|
@ -110,6 +110,8 @@ class EventTypes:
|
|||
|
||||
Dummy = "org.matrix.dummy_event"
|
||||
|
||||
SpaceChild = "m.space.child"
|
||||
SpaceParent = "m.space.parent"
|
||||
MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child"
|
||||
MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
|
||||
|
||||
|
@ -174,6 +176,7 @@ class EventContentFields:
|
|||
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
|
||||
|
||||
# cf https://github.com/matrix-org/matrix-doc/pull/1772
|
||||
ROOM_TYPE = "type"
|
||||
MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"
|
||||
|
||||
|
||||
|
|
|
@ -454,6 +454,10 @@ def start(config_options):
|
|||
config.server.update_user_directory = False
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
|
||||
|
||||
if config.server.gc_seconds:
|
||||
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
|
||||
|
||||
hs = GenericWorkerServer(
|
||||
config.server_name,
|
||||
|
|
|
@ -341,6 +341,10 @@ def setup(config_options):
|
|||
sys.exit(0)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
|
||||
|
||||
if config.server.gc_seconds:
|
||||
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
|
||||
|
||||
hs = SynapseHomeServer(
|
||||
config.server_name,
|
||||
|
|
|
@ -17,6 +17,8 @@ import re
|
|||
import threading
|
||||
from typing import Callable, Dict
|
||||
|
||||
from synapse.python_dependencies import DependencyException, check_requirements
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
# The prefix for all cache factor-related environment variables
|
||||
|
@ -189,6 +191,15 @@ class CacheConfig(Config):
|
|||
)
|
||||
self.cache_factors[cache] = factor
|
||||
|
||||
self.track_memory_usage = cache_config.get("track_memory_usage", False)
|
||||
if self.track_memory_usage:
|
||||
try:
|
||||
check_requirements("cache_memory")
|
||||
except DependencyException as e:
|
||||
raise ConfigError(
|
||||
e.message # noqa: B306, DependencyException.message is a property
|
||||
)
|
||||
|
||||
# Resize all caches (if necessary) with the new factors we've loaded
|
||||
self.resize_all_caches()
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ DEFAULT_CONFIG = """\
|
|||
# password: secretpassword
|
||||
# database: synapse
|
||||
# host: localhost
|
||||
# port: 5432
|
||||
# cp_min: 5
|
||||
# cp_max: 10
|
||||
#
|
||||
|
|
|
@ -19,7 +19,7 @@ import logging
|
|||
import os.path
|
||||
import re
|
||||
from textwrap import indent
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
import attr
|
||||
import yaml
|
||||
|
@ -572,6 +572,7 @@ class ServerConfig(Config):
|
|||
_warn_if_webclient_configured(self.listeners)
|
||||
|
||||
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
|
||||
self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
|
||||
|
||||
@attr.s
|
||||
class LimitRemoteRoomsConfig:
|
||||
|
@ -917,6 +918,16 @@ class ServerConfig(Config):
|
|||
#
|
||||
#gc_thresholds: [700, 10, 10]
|
||||
|
||||
# The minimum time in seconds between each GC for a generation, regardless of
|
||||
# the GC thresholds. This ensures that we don't do GC too frequently.
|
||||
#
|
||||
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
|
||||
# generation 0 GCs, etc.
|
||||
#
|
||||
# Defaults to `[1s, 10s, 30s]`.
|
||||
#
|
||||
#gc_min_interval: [0.5s, 30s, 1m]
|
||||
|
||||
# Set the limit on the returned events in the timeline in the get
|
||||
# and sync operations. The default value is 100. -1 means no upper limit.
|
||||
#
|
||||
|
@ -1305,6 +1316,24 @@ class ServerConfig(Config):
|
|||
help="Turn on the twisted telnet manhole service on the given port.",
|
||||
)
|
||||
|
||||
def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]:
|
||||
"""Reads the three durations for the GC min interval option, returning seconds."""
|
||||
if durations is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
if len(durations) != 3:
|
||||
raise ValueError()
|
||||
return (
|
||||
self.parse_duration(durations[0]) / 1000,
|
||||
self.parse_duration(durations[1]) / 1000,
|
||||
self.parse_duration(durations[2]) / 1000,
|
||||
)
|
||||
except Exception:
|
||||
raise ConfigError(
|
||||
"Value of `gc_min_interval` must be a list of three durations if set"
|
||||
)
|
||||
|
||||
|
||||
def is_threepid_reserved(reserved_threepids, threepid):
|
||||
"""Check the threepid against the reserved threepid config
|
||||
|
|
|
@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple,
|
|||
from synapse.rest.media.v1._base import FileInfo
|
||||
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
|
||||
from synapse.spam_checker_api import RegistrationBehaviour
|
||||
from synapse.types import RoomAlias
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -113,7 +114,9 @@ class SpamChecker:
|
|||
|
||||
return True
|
||||
|
||||
async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
|
||||
async def user_may_create_room_alias(
|
||||
self, userid: str, room_alias: RoomAlias
|
||||
) -> bool:
|
||||
"""Checks if a given user may create a room alias
|
||||
|
||||
If this method returns false, the association request will be rejected.
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
import logging
|
||||
import string
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional
|
||||
|
||||
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
|
||||
from synapse.api.errors import (
|
||||
|
@ -27,15 +27,19 @@ from synapse.api.errors import (
|
|||
SynapseError,
|
||||
)
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id
|
||||
from synapse.storage.databases.main.directory import RoomAliasMapping
|
||||
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DirectoryHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.state = hs.get_state_handler()
|
||||
|
@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler):
|
|||
room_id: str,
|
||||
servers: Optional[Iterable[str]] = None,
|
||||
creator: Optional[str] = None,
|
||||
):
|
||||
) -> None:
|
||||
# general association creation for both human users and app services
|
||||
|
||||
for wchar in string.whitespace:
|
||||
|
@ -74,7 +78,7 @@ class DirectoryHandler(BaseHandler):
|
|||
# TODO(erikj): Add transactions.
|
||||
# TODO(erikj): Check if there is a current association.
|
||||
if not servers:
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
servers = {get_domain_from_id(u) for u in users}
|
||||
|
||||
if not servers:
|
||||
|
@ -104,8 +108,9 @@ class DirectoryHandler(BaseHandler):
|
|||
"""
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
room_alias_str = room_alias.to_string()
|
||||
|
||||
if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
|
||||
if len(room_alias_str) > MAX_ALIAS_LENGTH:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
|
||||
|
@ -114,7 +119,7 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
service = requester.app_service
|
||||
if service:
|
||||
if not service.is_interested_in_alias(room_alias.to_string()):
|
||||
if not service.is_interested_in_alias(room_alias_str):
|
||||
raise SynapseError(
|
||||
400,
|
||||
"This application service has not reserved this kind of alias.",
|
||||
|
@ -138,7 +143,7 @@ class DirectoryHandler(BaseHandler):
|
|||
raise AuthError(403, "This user is not permitted to create this alias")
|
||||
|
||||
if not self.config.is_alias_creation_allowed(
|
||||
user_id, room_id, room_alias.to_string()
|
||||
user_id, room_id, room_alias_str
|
||||
):
|
||||
# Lets just return a generic message, as there may be all sorts of
|
||||
# reasons why we said no. TODO: Allow configurable error messages
|
||||
|
@ -211,7 +216,7 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
async def delete_appservice_association(
|
||||
self, service: ApplicationService, room_alias: RoomAlias
|
||||
):
|
||||
) -> None:
|
||||
if not service.is_interested_in_alias(room_alias.to_string()):
|
||||
raise SynapseError(
|
||||
400,
|
||||
|
@ -220,7 +225,7 @@ class DirectoryHandler(BaseHandler):
|
|||
)
|
||||
await self._delete_association(room_alias)
|
||||
|
||||
async def _delete_association(self, room_alias: RoomAlias):
|
||||
async def _delete_association(self, room_alias: RoomAlias) -> str:
|
||||
if not self.hs.is_mine(room_alias):
|
||||
raise SynapseError(400, "Room alias must be local")
|
||||
|
||||
|
@ -228,17 +233,19 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
return room_id
|
||||
|
||||
async def get_association(self, room_alias: RoomAlias):
|
||||
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
|
||||
room_id = None
|
||||
if self.hs.is_mine(room_alias):
|
||||
result = await self.get_association_from_room_alias(room_alias)
|
||||
result = await self.get_association_from_room_alias(
|
||||
room_alias
|
||||
) # type: Optional[RoomAliasMapping]
|
||||
|
||||
if result:
|
||||
room_id = result.room_id
|
||||
servers = result.servers
|
||||
else:
|
||||
try:
|
||||
result = await self.federation.make_query(
|
||||
fed_result = await self.federation.make_query(
|
||||
destination=room_alias.domain,
|
||||
query_type="directory",
|
||||
args={"room_alias": room_alias.to_string()},
|
||||
|
@ -248,13 +255,13 @@ class DirectoryHandler(BaseHandler):
|
|||
except CodeMessageException as e:
|
||||
logging.warning("Error retrieving alias")
|
||||
if e.code == 404:
|
||||
result = None
|
||||
fed_result = None
|
||||
else:
|
||||
raise
|
||||
|
||||
if result and "room_id" in result and "servers" in result:
|
||||
room_id = result["room_id"]
|
||||
servers = result["servers"]
|
||||
if fed_result and "room_id" in fed_result and "servers" in fed_result:
|
||||
room_id = fed_result["room_id"]
|
||||
servers = fed_result["servers"]
|
||||
|
||||
if not room_id:
|
||||
raise SynapseError(
|
||||
|
@ -263,7 +270,7 @@ class DirectoryHandler(BaseHandler):
|
|||
Codes.NOT_FOUND,
|
||||
)
|
||||
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
extra_servers = {get_domain_from_id(u) for u in users}
|
||||
servers = set(extra_servers) | set(servers)
|
||||
|
||||
|
@ -275,7 +282,7 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
return {"room_id": room_id, "servers": servers}
|
||||
|
||||
async def on_directory_query(self, args):
|
||||
async def on_directory_query(self, args: JsonDict) -> JsonDict:
|
||||
room_alias = RoomAlias.from_string(args["room_alias"])
|
||||
if not self.hs.is_mine(room_alias):
|
||||
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
|
||||
|
@ -293,7 +300,7 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
async def _update_canonical_alias(
|
||||
self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias
|
||||
):
|
||||
) -> None:
|
||||
"""
|
||||
Send an updated canonical alias event if the removed alias was set as
|
||||
the canonical alias or listed in the alt_aliases field.
|
||||
|
@ -344,7 +351,9 @@ class DirectoryHandler(BaseHandler):
|
|||
ratelimit=False,
|
||||
)
|
||||
|
||||
async def get_association_from_room_alias(self, room_alias: RoomAlias):
|
||||
async def get_association_from_room_alias(
|
||||
self, room_alias: RoomAlias
|
||||
) -> Optional[RoomAliasMapping]:
|
||||
result = await self.store.get_association_from_room_alias(room_alias)
|
||||
if not result:
|
||||
# Query AS to see if it exists
|
||||
|
@ -372,7 +381,7 @@ class DirectoryHandler(BaseHandler):
|
|||
# either no interested services, or no service with an exclusive lock
|
||||
return True
|
||||
|
||||
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
|
||||
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
|
||||
"""Determine whether a user can delete an alias.
|
||||
|
||||
One of the following must be true:
|
||||
|
@ -394,14 +403,13 @@ class DirectoryHandler(BaseHandler):
|
|||
if not room_id:
|
||||
return False
|
||||
|
||||
res = await self.auth.check_can_change_room_list(
|
||||
return await self.auth.check_can_change_room_list(
|
||||
room_id, UserID.from_string(user_id)
|
||||
)
|
||||
return res
|
||||
|
||||
async def edit_published_room_list(
|
||||
self, requester: Requester, room_id: str, visibility: str
|
||||
):
|
||||
) -> None:
|
||||
"""Edit the entry of the room in the published room list.
|
||||
|
||||
requester
|
||||
|
@ -469,7 +477,7 @@ class DirectoryHandler(BaseHandler):
|
|||
|
||||
async def edit_published_appservice_room_list(
|
||||
self, appservice_id: str, network_id: str, room_id: str, visibility: str
|
||||
):
|
||||
) -> None:
|
||||
"""Add or remove a room from the appservice/network specific public
|
||||
room list.
|
||||
|
||||
|
@ -499,5 +507,4 @@ class DirectoryHandler(BaseHandler):
|
|||
room_id, requester.user.to_string()
|
||||
)
|
||||
|
||||
aliases = await self.store.get_aliases_for_room(room_id)
|
||||
return aliases
|
||||
return await self.store.get_aliases_for_room(room_id)
|
||||
|
|
|
@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler):
|
|||
# Send down presence.
|
||||
if event.state_key == auth_user_id:
|
||||
# Send down presence for everyone in the room.
|
||||
users = await self.state.get_current_users_in_room(
|
||||
users = await self.store.get_users_in_room(
|
||||
event.room_id
|
||||
) # type: Iterable[str]
|
||||
else:
|
||||
|
|
|
@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler):
|
|||
# If we are going to send this event over federation we precaclculate
|
||||
# the joined hosts.
|
||||
if event.internal_metadata.get_send_on_behalf_of():
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(event)
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(
|
||||
event, context
|
||||
)
|
||||
|
||||
return context
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
"""Utilities for interacting with Identity Servers"""
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
|
@ -41,13 +41,16 @@ from synapse.util.stringutils import (
|
|||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
id_server_scheme = "https://"
|
||||
|
||||
|
||||
class IdentityHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
# An HTTP client for contacting trusted URLs.
|
||||
|
@ -80,7 +83,7 @@ class IdentityHandler(BaseHandler):
|
|||
request: SynapseRequest,
|
||||
medium: str,
|
||||
address: str,
|
||||
):
|
||||
) -> None:
|
||||
"""Used to ratelimit requests to `/requestToken` by IP and address.
|
||||
|
||||
Args:
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
import random
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
|
@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter
|
|||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
@ -66,7 +67,7 @@ logger = logging.getLogger(__name__)
|
|||
class MessageHandler:
|
||||
"""Contains some read only APIs to get state about a room"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.auth = hs.get_auth()
|
||||
self.clock = hs.get_clock()
|
||||
self.state = hs.get_state_handler()
|
||||
|
@ -91,7 +92,7 @@ class MessageHandler:
|
|||
room_id: str,
|
||||
event_type: str,
|
||||
state_key: str,
|
||||
) -> dict:
|
||||
) -> Optional[EventBase]:
|
||||
"""Get data from a room.
|
||||
|
||||
Args:
|
||||
|
@ -115,6 +116,10 @@ class MessageHandler:
|
|||
data = await self.state.get_current_state(room_id, event_type, state_key)
|
||||
elif membership == Membership.LEAVE:
|
||||
key = (event_type, state_key)
|
||||
# If the membership is not JOIN, then the event ID should exist.
|
||||
assert (
|
||||
membership_event_id is not None
|
||||
), "check_user_in_room_or_world_readable returned invalid data"
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
[membership_event_id], StateFilter.from_types([key])
|
||||
)
|
||||
|
@ -186,10 +191,12 @@ class MessageHandler:
|
|||
|
||||
event = last_events[0]
|
||||
if visible_events:
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
room_state_events = await self.state_store.get_state_for_events(
|
||||
[event.event_id], state_filter=state_filter
|
||||
)
|
||||
room_state = room_state[event.event_id]
|
||||
room_state = room_state_events[
|
||||
event.event_id
|
||||
] # type: Mapping[Any, EventBase]
|
||||
else:
|
||||
raise AuthError(
|
||||
403,
|
||||
|
@ -210,10 +217,14 @@ class MessageHandler:
|
|||
)
|
||||
room_state = await self.store.get_events(state_ids.values())
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = await self.state_store.get_state_for_events(
|
||||
# If the membership is not JOIN, then the event ID should exist.
|
||||
assert (
|
||||
membership_event_id is not None
|
||||
), "check_user_in_room_or_world_readable returned invalid data"
|
||||
room_state_events = await self.state_store.get_state_for_events(
|
||||
[membership_event_id], state_filter=state_filter
|
||||
)
|
||||
room_state = room_state[membership_event_id]
|
||||
room_state = room_state_events[membership_event_id]
|
||||
|
||||
now = self.clock.time_msec()
|
||||
events = await self._event_serializer.serialize_events(
|
||||
|
@ -248,7 +259,7 @@ class MessageHandler:
|
|||
"Getting joined members after leaving is not implemented"
|
||||
)
|
||||
|
||||
users_with_profile = await self.state.get_current_users_in_room(room_id)
|
||||
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
|
||||
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or because there
|
||||
|
@ -447,6 +458,19 @@ class EventCreationHandler:
|
|||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
|
||||
# Stores the state groups we've recently added to the joined hosts
|
||||
# external cache. Note that the timeout must be significantly less than
|
||||
# the TTL on the external cache.
|
||||
self._external_cache_joined_hosts_updates = (
|
||||
None
|
||||
) # type: Optional[ExpiringCache]
|
||||
if self._external_cache.is_enabled():
|
||||
self._external_cache_joined_hosts_updates = ExpiringCache(
|
||||
"_external_cache_joined_hosts_updates",
|
||||
self.clock,
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
async def create_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
|
@ -957,7 +981,7 @@ class EventCreationHandler:
|
|||
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
|
||||
await self.cache_joined_hosts_for_event(event)
|
||||
await self.cache_joined_hosts_for_event(event, context)
|
||||
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
|
@ -998,7 +1022,9 @@ class EventCreationHandler:
|
|||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
|
||||
async def cache_joined_hosts_for_event(
|
||||
self, event: EventBase, context: EventContext
|
||||
) -> None:
|
||||
"""Precalculate the joined hosts at the event, when using Redis, so that
|
||||
external federation senders don't have to recalculate it themselves.
|
||||
"""
|
||||
|
@ -1006,6 +1032,9 @@ class EventCreationHandler:
|
|||
if not self._external_cache.is_enabled():
|
||||
return
|
||||
|
||||
# If external cache is enabled we should always have this.
|
||||
assert self._external_cache_joined_hosts_updates is not None
|
||||
|
||||
# We actually store two mappings, event ID -> prev state group,
|
||||
# state group -> joined hosts, which is much more space efficient
|
||||
# than event ID -> joined hosts.
|
||||
|
@ -1013,16 +1042,21 @@ class EventCreationHandler:
|
|||
# Note: We have to cache event ID -> prev state group, as we don't
|
||||
# store that in the DB.
|
||||
#
|
||||
# Note: We always set the state group -> joined hosts cache, even if
|
||||
# we already set it, so that the expiry time is reset.
|
||||
# Note: We set the state group -> joined hosts cache if it hasn't been
|
||||
# set for a while, so that the expiry time is reset.
|
||||
|
||||
state_entry = await self.state.resolve_state_groups_for_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
)
|
||||
|
||||
if state_entry.state_group:
|
||||
if state_entry.state_group in self._external_cache_joined_hosts_updates:
|
||||
return
|
||||
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
# Note that the expiry times must be larger than the expiry time in
|
||||
# _external_cache_joined_hosts_updates.
|
||||
await self._external_cache.set(
|
||||
"event_to_prev_state_group",
|
||||
event.event_id,
|
||||
|
@ -1036,6 +1070,8 @@ class EventCreationHandler:
|
|||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
|
||||
|
||||
async def _validate_canonical_alias(
|
||||
self, directory_handler, room_alias_str: str, expected_room_id: str
|
||||
) -> None:
|
||||
|
|
|
@ -28,6 +28,7 @@ from bisect import bisect
|
|||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
|
@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
|
|||
"""
|
||||
|
||||
async def update_external_syncs_row(
|
||||
self, process_id, user_id, is_syncing, sync_time_msec
|
||||
):
|
||||
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
|
||||
) -> None:
|
||||
"""Update the syncing users for an external process as a delta.
|
||||
|
||||
This is a no-op when presence is handled by a different worker.
|
||||
|
||||
Args:
|
||||
process_id (str): An identifier for the process the users are
|
||||
process_id: An identifier for the process the users are
|
||||
syncing against. This allows synapse to process updates
|
||||
as user start and stop syncing against a given process.
|
||||
user_id (str): The user who has started or stopped syncing
|
||||
is_syncing (bool): Whether or not the user is now syncing
|
||||
sync_time_msec(int): Time in ms when the user was last syncing
|
||||
user_id: The user who has started or stopped syncing
|
||||
is_syncing: Whether or not the user is now syncing
|
||||
sync_time_msec: Time in ms when the user was last syncing
|
||||
"""
|
||||
pass
|
||||
|
||||
async def update_external_syncs_clear(self, process_id):
|
||||
async def update_external_syncs_clear(self, process_id: str) -> None:
|
||||
"""Marks all users that had been marked as syncing by a given process
|
||||
as offline.
|
||||
|
||||
|
@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
|
|||
|
||||
|
||||
class WorkerPresenceHandler(BasePresenceHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
|
||||
|
@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
|
||||
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
|
||||
# we haven't notified the presence writer of that yet
|
||||
self.users_going_offline = {}
|
||||
self.users_going_offline = {} # type: Dict[str, int]
|
||||
|
||||
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
|
||||
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
|
||||
|
@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
self._on_shutdown,
|
||||
)
|
||||
|
||||
def _on_shutdown(self):
|
||||
def _on_shutdown(self) -> None:
|
||||
if self._presence_enabled:
|
||||
self.hs.get_tcp_replication().send_command(
|
||||
ClearUserSyncsCommand(self.instance_id)
|
||||
)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
|
||||
if self._presence_enabled:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
self.instance_id, user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
def mark_as_coming_online(self, user_id: str) -> None:
|
||||
"""A user has started syncing. Send a UserSync to the presence writer,
|
||||
unless they had recently stopped syncing.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
going_offline = self.users_going_offline.pop(user_id, None)
|
||||
if not going_offline:
|
||||
|
@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
# were offline
|
||||
self.send_user_sync(user_id, True, self.clock.time_msec())
|
||||
|
||||
def mark_as_going_offline(self, user_id):
|
||||
def mark_as_going_offline(self, user_id: str) -> None:
|
||||
"""A user has stopped syncing. We wait before notifying the presence
|
||||
writer as its likely they'll come back soon. This allows us to avoid
|
||||
sending a stopped syncing immediately followed by a started syncing
|
||||
notification to the presence writer
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
"""
|
||||
self.users_going_offline[user_id] = self.clock.time_msec()
|
||||
|
||||
def send_stop_syncing(self):
|
||||
def send_stop_syncing(self) -> None:
|
||||
"""Check if there are any users who have stopped syncing a while ago and
|
||||
haven't come back yet. If there are poke the presence writer about them.
|
||||
"""
|
||||
|
@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
|
||||
return _user_syncing()
|
||||
|
||||
async def notify_from_replication(self, states, stream_id):
|
||||
async def notify_from_replication(
|
||||
self, states: List[UserPresenceState], stream_id: int
|
||||
) -> None:
|
||||
parties = await get_interested_parties(self.store, self.presence_router, states)
|
||||
room_ids_to_states, users_to_states = parties
|
||||
|
||||
|
@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
if count > 0
|
||||
]
|
||||
|
||||
async def set_state(self, target_user, state, ignore_status_msg=False):
|
||||
async def set_state(
|
||||
self,
|
||||
target_user: UserID,
|
||||
state: JsonDict,
|
||||
ignore_status_msg: bool = False,
|
||||
) -> None:
|
||||
"""Set the presence state of the user."""
|
||||
presence = state["presence"]
|
||||
|
||||
|
@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||
ignore_status_msg=ignore_status_msg,
|
||||
)
|
||||
|
||||
async def bump_presence_active_time(self, user):
|
||||
async def bump_presence_active_time(self, user: UserID) -> None:
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
|
@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# we assume that all the sync requests on that process have stopped.
|
||||
# Stored as a dict from process_id to set of user_id, and a dict of
|
||||
# process_id to millisecond timestamp last updated.
|
||||
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
|
||||
self.external_process_last_updated_ms = {} # type: Dict[int, int]
|
||||
self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]]
|
||||
self.external_process_last_updated_ms = {} # type: Dict[str, int]
|
||||
|
||||
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
|
||||
|
||||
|
@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
self._event_pos = self.store.get_current_events_token()
|
||||
self._event_processing = False
|
||||
|
||||
async def _on_shutdown(self):
|
||||
async def _on_shutdown(self) -> None:
|
||||
"""Gets called when shutting down. This lets us persist any updates that
|
||||
we haven't yet persisted, e.g. updates that only changes some internal
|
||||
timers. This allows changes to persist across startup without having to
|
||||
|
@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
)
|
||||
logger.info("Finished _on_shutdown")
|
||||
|
||||
async def _persist_unpersisted_changes(self):
|
||||
async def _persist_unpersisted_changes(self) -> None:
|
||||
"""We periodically persist the unpersisted changes, as otherwise they
|
||||
may stack up and slow down shutdown times.
|
||||
"""
|
||||
|
@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
states, destinations
|
||||
)
|
||||
|
||||
async def _handle_timeouts(self):
|
||||
async def _handle_timeouts(self) -> None:
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
appropriate.
|
||||
"""
|
||||
|
@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
return await self._update_states(changes)
|
||||
|
||||
async def bump_presence_active_time(self, user):
|
||||
async def bump_presence_active_time(self, user: UserID) -> None:
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
|
@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
|
|||
return []
|
||||
|
||||
async def update_external_syncs_row(
|
||||
self, process_id, user_id, is_syncing, sync_time_msec
|
||||
):
|
||||
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
|
||||
) -> None:
|
||||
"""Update the syncing users for an external process as a delta.
|
||||
|
||||
Args:
|
||||
process_id (str): An identifier for the process the users are
|
||||
process_id: An identifier for the process the users are
|
||||
syncing against. This allows synapse to process updates
|
||||
as user start and stop syncing against a given process.
|
||||
user_id (str): The user who has started or stopped syncing
|
||||
is_syncing (bool): Whether or not the user is now syncing
|
||||
sync_time_msec(int): Time in ms when the user was last syncing
|
||||
user_id: The user who has started or stopped syncing
|
||||
is_syncing: Whether or not the user is now syncing
|
||||
sync_time_msec: Time in ms when the user was last syncing
|
||||
"""
|
||||
with (await self.external_sync_linearizer.queue(process_id)):
|
||||
prev_state = await self.current_state_for_user(user_id)
|
||||
|
@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
|
||||
|
||||
async def update_external_syncs_clear(self, process_id):
|
||||
async def update_external_syncs_clear(self, process_id: str) -> None:
|
||||
"""Marks all users that had been marked as syncing by a given process
|
||||
as offline.
|
||||
|
||||
|
@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
|
|||
)
|
||||
self.external_process_last_updated_ms.pop(process_id, None)
|
||||
|
||||
async def current_state_for_user(self, user_id):
|
||||
async def current_state_for_user(self, user_id: str) -> UserPresenceState:
|
||||
"""Get the current presence state for a user."""
|
||||
res = await self.current_state_for_users([user_id])
|
||||
return res[user_id]
|
||||
|
||||
async def _persist_and_notify(self, states):
|
||||
async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
|
||||
"""Persist states in the database, poke the notifier and send to
|
||||
interested remote servers
|
||||
"""
|
||||
|
@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# stream (which is updated by `store.update_presence`).
|
||||
await self.maybe_send_presence_to_interested_destinations(states)
|
||||
|
||||
async def incoming_presence(self, origin, content):
|
||||
async def incoming_presence(self, origin: str, content: JsonDict) -> None:
|
||||
"""Called when we receive a `m.presence` EDU from a remote server."""
|
||||
if not self._presence_enabled:
|
||||
return
|
||||
|
@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
|
|||
federation_presence_counter.inc(len(updates))
|
||||
await self._update_states(updates)
|
||||
|
||||
async def set_state(self, target_user, state, ignore_status_msg=False):
|
||||
async def set_state(
|
||||
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
|
||||
) -> None:
|
||||
"""Set the presence state of the user."""
|
||||
status_msg = state.get("status_msg", None)
|
||||
presence = state["presence"]
|
||||
|
@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
await self._update_states([prev_state.copy_and_replace(**new_fields)])
|
||||
|
||||
async def is_visible(self, observed_user, observer_user):
|
||||
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
|
||||
"""Returns whether a user can see another user's presence."""
|
||||
observer_room_ids = await self.store.get_rooms_for_user(
|
||||
observer_user.to_string()
|
||||
|
@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
)
|
||||
return rows
|
||||
|
||||
def notify_new_event(self):
|
||||
def notify_new_event(self) -> None:
|
||||
"""Called when new events have happened. Handles users and servers
|
||||
joining rooms and require being sent presence.
|
||||
"""
|
||||
|
@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
run_as_background_process("presence.notify_new_event", _process_presence)
|
||||
|
||||
async def _unsafe_process(self):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
|
@ -1179,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
|
|||
max_pos, deltas = await self.store.get_current_state_deltas(
|
||||
self._event_pos, room_max_stream_ordering
|
||||
)
|
||||
await self._handle_state_delta(deltas)
|
||||
|
||||
# We may get multiple deltas for different rooms, but we want to
|
||||
# handle them on a room by room basis, so we batch them up by
|
||||
# room.
|
||||
deltas_by_room: Dict[str, List[JsonDict]] = {}
|
||||
for delta in deltas:
|
||||
deltas_by_room.setdefault(delta["room_id"], []).append(delta)
|
||||
|
||||
for room_id, deltas_for_room in deltas_by_room.items():
|
||||
await self._handle_state_delta(room_id, deltas_for_room)
|
||||
|
||||
self._event_pos = max_pos
|
||||
|
||||
|
@ -1188,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler):
|
|||
max_pos
|
||||
)
|
||||
|
||||
async def _handle_state_delta(self, deltas):
|
||||
"""Process current state deltas to find new joins that need to be
|
||||
handled.
|
||||
async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
|
||||
"""Process current state deltas for the room to find new joins that need
|
||||
to be handled.
|
||||
"""
|
||||
# A map of destination to a set of user state that they should receive
|
||||
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
|
||||
|
||||
# Sets of newly joined users. Note that if the local server is
|
||||
# joining a remote room for the first time we'll see both the joining
|
||||
# user and all remote users as newly joined.
|
||||
newly_joined_users = set()
|
||||
|
||||
for delta in deltas:
|
||||
assert room_id == delta["room_id"]
|
||||
|
||||
typ = delta["type"]
|
||||
state_key = delta["state_key"]
|
||||
room_id = delta["room_id"]
|
||||
event_id = delta["event_id"]
|
||||
prev_event_id = delta["prev_event_id"]
|
||||
|
||||
|
@ -1227,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler):
|
|||
# Ignore changes to join events.
|
||||
continue
|
||||
|
||||
# Retrieve any user presence state updates that need to be sent as a result,
|
||||
# and the destinations that need to receive it
|
||||
destinations, user_presence_states = await self._on_user_joined_room(
|
||||
room_id, state_key
|
||||
)
|
||||
newly_joined_users.add(state_key)
|
||||
|
||||
# Insert the destinations and respective updates into our destinations dict
|
||||
for destination in destinations:
|
||||
presence_destinations.setdefault(destination, set()).update(
|
||||
user_presence_states
|
||||
)
|
||||
if not newly_joined_users:
|
||||
# If nobody has joined then there's nothing to do.
|
||||
return
|
||||
|
||||
# Send out user presence updates for each destination
|
||||
for destination, user_state_set in presence_destinations.items():
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=[destination], states=user_state_set
|
||||
)
|
||||
# We want to send:
|
||||
# 1. presence states of all local users in the room to newly joined
|
||||
# remote servers
|
||||
# 2. presence states of newly joined users to all remote servers in
|
||||
# the room.
|
||||
#
|
||||
# TODO: Only send presence states to remote hosts that don't already
|
||||
# have them (because they already share rooms).
|
||||
|
||||
async def _on_user_joined_room(
|
||||
self, room_id: str, user_id: str
|
||||
) -> Tuple[List[str], List[UserPresenceState]]:
|
||||
"""Called when we detect a user joining the room via the current state
|
||||
delta stream. Returns the destinations that need to be updated and the
|
||||
presence updates to send to them.
|
||||
# Get all the users who were already in the room, by fetching the
|
||||
# current users in the room and removing the newly joined users.
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
prev_users = set(users) - newly_joined_users
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room that the user has joined.
|
||||
user_id: The ID of the user that has joined the room.
|
||||
# Construct sets for all the local users and remote hosts that were
|
||||
# already in the room
|
||||
prev_local_users = []
|
||||
prev_remote_hosts = set()
|
||||
for user_id in prev_users:
|
||||
if self.is_mine_id(user_id):
|
||||
prev_local_users.append(user_id)
|
||||
else:
|
||||
prev_remote_hosts.add(get_domain_from_id(user_id))
|
||||
|
||||
Returns:
|
||||
A tuple of destinations and presence updates to send to them.
|
||||
"""
|
||||
if self.is_mine_id(user_id):
|
||||
# If this is a local user then we need to send their presence
|
||||
# out to hosts in the room (who don't already have it)
|
||||
# Similarly, construct sets for all the local users and remote hosts
|
||||
# that were *not* already in the room. Care needs to be taken with the
|
||||
# calculating the remote hosts, as a host may have already been in the
|
||||
# room even if there is a newly joined user from that host.
|
||||
newly_joined_local_users = []
|
||||
newly_joined_remote_hosts = set()
|
||||
for user_id in newly_joined_users:
|
||||
if self.is_mine_id(user_id):
|
||||
newly_joined_local_users.append(user_id)
|
||||
else:
|
||||
host = get_domain_from_id(user_id)
|
||||
if host not in prev_remote_hosts:
|
||||
newly_joined_remote_hosts.add(host)
|
||||
|
||||
# TODO: We should be able to filter the hosts down to those that
|
||||
# haven't previously seen the user
|
||||
|
||||
remote_hosts = await self.state.get_current_hosts_in_room(room_id)
|
||||
|
||||
# Filter out ourselves.
|
||||
filtered_remote_hosts = [
|
||||
host for host in remote_hosts if host != self.server_name
|
||||
]
|
||||
|
||||
state = await self.current_state_for_user(user_id)
|
||||
return filtered_remote_hosts, [state]
|
||||
else:
|
||||
# A remote user has joined the room, so we need to:
|
||||
# 1. Check if this is a new server in the room
|
||||
# 2. If so send any presence they don't already have for
|
||||
# local users in the room.
|
||||
|
||||
# TODO: We should be able to filter the users down to those that
|
||||
# the server hasn't previously seen
|
||||
|
||||
# TODO: Check that this is actually a new server joining the
|
||||
# room.
|
||||
|
||||
remote_host = get_domain_from_id(user_id)
|
||||
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
user_ids = list(filter(self.is_mine_id, users))
|
||||
|
||||
states_d = await self.current_state_for_users(user_ids)
|
||||
# Send presence states of all local users in the room to newly joined
|
||||
# remote servers. (We actually only send states for local users already
|
||||
# in the room, as we'll send states for newly joined local users below.)
|
||||
if prev_local_users and newly_joined_remote_hosts:
|
||||
local_states = await self.current_state_for_users(prev_local_users)
|
||||
|
||||
# Filter out old presence, i.e. offline presence states where
|
||||
# the user hasn't been active for a week. We can change this
|
||||
|
@ -1302,16 +1302,30 @@ class PresenceHandler(BasePresenceHandler):
|
|||
now = self.clock.time_msec()
|
||||
states = [
|
||||
state
|
||||
for state in states_d.values()
|
||||
for state in local_states.values()
|
||||
if state.state != PresenceState.OFFLINE
|
||||
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
|
||||
or state.status_msg is not None
|
||||
]
|
||||
|
||||
return [remote_host], states
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=newly_joined_remote_hosts,
|
||||
states=states,
|
||||
)
|
||||
|
||||
# Send presence states of newly joined users to all remote servers in
|
||||
# the room
|
||||
if newly_joined_local_users and (
|
||||
prev_remote_hosts or newly_joined_remote_hosts
|
||||
):
|
||||
local_states = await self.current_state_for_users(newly_joined_local_users)
|
||||
self._federation_queue.send_presence_to_destinations(
|
||||
destinations=prev_remote_hosts | newly_joined_remote_hosts,
|
||||
states=list(local_states.values()),
|
||||
)
|
||||
|
||||
|
||||
def should_notify(old_state, new_state):
|
||||
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
|
||||
"""Decides if a presence state change should be sent to interested parties."""
|
||||
if old_state == new_state:
|
||||
return False
|
||||
|
@ -1347,7 +1361,9 @@ def should_notify(old_state, new_state):
|
|||
return False
|
||||
|
||||
|
||||
def format_user_presence_state(state, now, include_user_id=True):
|
||||
def format_user_presence_state(
|
||||
state: UserPresenceState, now: int, include_user_id: bool = True
|
||||
) -> JsonDict:
|
||||
"""Convert UserPresenceState to a format that can be sent down to clients
|
||||
and to other servers.
|
||||
|
||||
|
@ -1385,11 +1401,11 @@ class PresenceEventSource:
|
|||
@log_function
|
||||
async def get_new_events(
|
||||
self,
|
||||
user,
|
||||
from_key,
|
||||
room_ids=None,
|
||||
include_offline=True,
|
||||
explicit_room_id=None,
|
||||
user: UserID,
|
||||
from_key: Optional[int],
|
||||
room_ids: Optional[List[str]] = None,
|
||||
include_offline: bool = True,
|
||||
explicit_room_id: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> Tuple[List[UserPresenceState], int]:
|
||||
# The process for getting presence events are:
|
||||
|
@ -1594,7 +1610,7 @@ class PresenceEventSource:
|
|||
if update.state != PresenceState.OFFLINE
|
||||
]
|
||||
|
||||
def get_current_key(self):
|
||||
def get_current_key(self) -> int:
|
||||
return self.store.get_current_presence_token()
|
||||
|
||||
@cached(num_args=2, cache_context=True)
|
||||
|
@ -1654,15 +1670,20 @@ class PresenceEventSource:
|
|||
return users_interested_in
|
||||
|
||||
|
||||
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
|
||||
def handle_timeouts(
|
||||
user_states: List[UserPresenceState],
|
||||
is_mine_fn: Callable[[str], bool],
|
||||
syncing_user_ids: Set[str],
|
||||
now: int,
|
||||
) -> List[UserPresenceState]:
|
||||
"""Checks the presence of users that have timed out and updates as
|
||||
appropriate.
|
||||
|
||||
Args:
|
||||
user_states(list): List of UserPresenceState's to check.
|
||||
is_mine_fn (fn): Function that returns if a user_id is ours
|
||||
syncing_user_ids (set): Set of user_ids with active syncs.
|
||||
now (int): Current time in ms.
|
||||
user_states: List of UserPresenceState's to check.
|
||||
is_mine_fn: Function that returns if a user_id is ours
|
||||
syncing_user_ids: Set of user_ids with active syncs.
|
||||
now: Current time in ms.
|
||||
|
||||
Returns:
|
||||
List of UserPresenceState updates
|
||||
|
@ -1679,14 +1700,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
|
|||
return list(changes.values())
|
||||
|
||||
|
||||
def handle_timeout(state, is_mine, syncing_user_ids, now):
|
||||
def handle_timeout(
|
||||
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
|
||||
) -> Optional[UserPresenceState]:
|
||||
"""Checks the presence of the user to see if any of the timers have elapsed
|
||||
|
||||
Args:
|
||||
state (UserPresenceState)
|
||||
is_mine (bool): Whether the user is ours
|
||||
syncing_user_ids (set): Set of user_ids with active syncs.
|
||||
now (int): Current time in ms.
|
||||
state
|
||||
is_mine: Whether the user is ours
|
||||
syncing_user_ids: Set of user_ids with active syncs.
|
||||
now: Current time in ms.
|
||||
|
||||
Returns:
|
||||
A UserPresenceState update or None if no update.
|
||||
|
@ -1738,23 +1761,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
|
|||
return state if changed else None
|
||||
|
||||
|
||||
def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
|
||||
def handle_update(
|
||||
prev_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
wheel_timer: WheelTimer,
|
||||
now: int,
|
||||
) -> Tuple[UserPresenceState, bool, bool]:
|
||||
"""Given a presence update:
|
||||
1. Add any appropriate timers.
|
||||
2. Check if we should notify anyone.
|
||||
|
||||
Args:
|
||||
prev_state (UserPresenceState)
|
||||
new_state (UserPresenceState)
|
||||
is_mine (bool): Whether the user is ours
|
||||
wheel_timer (WheelTimer)
|
||||
now (int): Time now in ms
|
||||
prev_state
|
||||
new_state
|
||||
is_mine: Whether the user is ours
|
||||
wheel_timer
|
||||
now: Time now in ms
|
||||
|
||||
Returns:
|
||||
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
|
||||
- new_state: is the state to actually persist
|
||||
- persist_and_notify (bool): whether to persist and notify people
|
||||
- federation_ping (bool): whether we should send a ping over federation
|
||||
- persist_and_notify: whether to persist and notify people
|
||||
- federation_ping: whether we should send a ping over federation
|
||||
"""
|
||||
user_id = new_state.user_id
|
||||
|
||||
|
|
|
@ -1327,7 +1327,7 @@ class RoomShutdownHandler:
|
|||
new_room_id = None
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
kicked_users = []
|
||||
failed_to_kick_users = []
|
||||
for user_id in users:
|
||||
|
|
|
@ -1064,7 +1064,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|||
|
||||
|
||||
class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self.distributor = hs.get_distributor()
|
||||
|
|
|
@ -288,6 +288,7 @@ class SpaceSummaryHandler:
|
|||
ev.data
|
||||
for ev in res.events
|
||||
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
|
||||
or ev.event_type == EventTypes.SpaceChild
|
||||
)
|
||||
|
||||
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
|
||||
|
@ -331,7 +332,9 @@ class SpaceSummaryHandler:
|
|||
)
|
||||
|
||||
# TODO: update once MSC1772 lands
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
|
||||
if not room_type:
|
||||
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
|
||||
|
||||
entry = {
|
||||
"room_id": stats["room_id"],
|
||||
|
@ -344,6 +347,7 @@ class SpaceSummaryHandler:
|
|||
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
|
||||
),
|
||||
"guest_can_join": stats["guest_access"] == "can_join",
|
||||
"creation_ts": create_event.origin_server_ts,
|
||||
"room_type": room_type,
|
||||
}
|
||||
|
||||
|
@ -360,8 +364,9 @@ class SpaceSummaryHandler:
|
|||
[
|
||||
event_id
|
||||
for key, event_id in current_state_ids.items()
|
||||
# TODO: update once MSC1772 lands
|
||||
# TODO: update once MSC1772 has been FCP for a period of time.
|
||||
if key[0] == EventTypes.MSC1772_SPACE_CHILD
|
||||
or key[0] == EventTypes.SpaceChild
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -1191,7 +1191,7 @@ class SyncHandler:
|
|||
|
||||
# Step 1b, check for newly joined rooms
|
||||
for room_id in newly_joined_rooms:
|
||||
joined_users = await self.state.get_current_users_in_room(room_id)
|
||||
joined_users = await self.store.get_users_in_room(room_id)
|
||||
newly_joined_or_invited_users.update(joined_users)
|
||||
|
||||
# TODO: Check that these users are actually new, i.e. either they
|
||||
|
@ -1207,7 +1207,7 @@ class SyncHandler:
|
|||
|
||||
# Now find users that we no longer track
|
||||
for room_id in newly_left_rooms:
|
||||
left_users = await self.state.get_current_users_in_room(room_id)
|
||||
left_users = await self.store.get_users_in_room(room_id)
|
||||
newly_left_users.update(left_users)
|
||||
|
||||
# Remove any users that we still share a room with.
|
||||
|
@ -1362,7 +1362,7 @@ class SyncHandler:
|
|||
|
||||
extra_users_ids = set(newly_joined_or_invited_users)
|
||||
for room_id in newly_joined_rooms:
|
||||
users = await self.state.get_current_users_in_room(room_id)
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
extra_users_ids.update(users)
|
||||
extra_users_ids.discard(user.to_string())
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from twisted.web.client import PartialDownloadError
|
||||
|
||||
|
@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError
|
|||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UserInteractiveAuthChecker:
|
||||
"""Abstract base class for an interactive auth checker"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
pass
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
|
@ -57,10 +60,10 @@ class UserInteractiveAuthChecker:
|
|||
class DummyAuthChecker(UserInteractiveAuthChecker):
|
||||
AUTH_TYPE = LoginType.DUMMY
|
||||
|
||||
def is_enabled(self):
|
||||
def is_enabled(self) -> bool:
|
||||
return True
|
||||
|
||||
async def check_auth(self, authdict, clientip):
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return True
|
||||
|
||||
|
||||
|
@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
|
|||
def is_enabled(self):
|
||||
return True
|
||||
|
||||
async def check_auth(self, authdict, clientip):
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return True
|
||||
|
||||
|
||||
class RecaptchaAuthChecker(UserInteractiveAuthChecker):
|
||||
AUTH_TYPE = LoginType.RECAPTCHA
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self._enabled = bool(hs.config.recaptcha_private_key)
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._url = hs.config.recaptcha_siteverify_api
|
||||
self._secret = hs.config.recaptcha_private_key
|
||||
|
||||
def is_enabled(self):
|
||||
def is_enabled(self) -> bool:
|
||||
return self._enabled
|
||||
|
||||
async def check_auth(self, authdict, clientip):
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
try:
|
||||
user_response = authdict["response"]
|
||||
except KeyError:
|
||||
|
@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
|
|||
|
||||
|
||||
class _BaseThreepidAuthChecker:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def _check_threepid(self, medium, authdict):
|
||||
async def _check_threepid(self, medium: str, authdict: dict) -> dict:
|
||||
if "threepid_creds" not in authdict:
|
||||
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
|
||||
|
||||
|
@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker:
|
|||
class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
|
||||
AUTH_TYPE = LoginType.EMAIL_IDENTITY
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
UserInteractiveAuthChecker.__init__(self, hs)
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self):
|
||||
def is_enabled(self) -> bool:
|
||||
return self.hs.config.threepid_behaviour_email in (
|
||||
ThreepidBehaviour.REMOTE,
|
||||
ThreepidBehaviour.LOCAL,
|
||||
)
|
||||
|
||||
async def check_auth(self, authdict, clientip):
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return await self._check_threepid("email", authdict)
|
||||
|
||||
|
||||
class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
|
||||
AUTH_TYPE = LoginType.MSISDN
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
UserInteractiveAuthChecker.__init__(self, hs)
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self):
|
||||
def is_enabled(self) -> bool:
|
||||
return bool(self.hs.config.account_threepid_delegate_msisdn)
|
||||
|
||||
async def check_auth(self, authdict, clientip):
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return await self._check_threepid("msisdn", authdict)
|
||||
|
||||
|
||||
|
|
|
@ -535,6 +535,13 @@ class ReactorLastSeenMetric:
|
|||
|
||||
REGISTRY.register(ReactorLastSeenMetric())
|
||||
|
||||
# The minimum time in seconds between GCs for each generation, regardless of the current GC
|
||||
# thresholds and counts.
|
||||
MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
|
||||
|
||||
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
|
||||
_last_gc = [0.0, 0.0, 0.0]
|
||||
|
||||
|
||||
def runUntilCurrentTimer(reactor, func):
|
||||
@functools.wraps(func)
|
||||
|
@ -575,11 +582,16 @@ def runUntilCurrentTimer(reactor, func):
|
|||
return ret
|
||||
|
||||
# Check if we need to do a manual GC (since its been disabled), and do
|
||||
# one if necessary.
|
||||
# one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
|
||||
# promote an object into gen 2, and we don't want to handle the same
|
||||
# object multiple times.
|
||||
threshold = gc.get_threshold()
|
||||
counts = gc.get_count()
|
||||
for i in (2, 1, 0):
|
||||
if threshold[i] < counts[i]:
|
||||
# We check if we need to do one based on a straightforward
|
||||
# comparison between the threshold and count. We also do an extra
|
||||
# check to make sure that we don't a GC too often.
|
||||
if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
|
||||
if i == 0:
|
||||
logger.debug("Collecting gc %d", i)
|
||||
else:
|
||||
|
@ -589,6 +601,8 @@ def runUntilCurrentTimer(reactor, func):
|
|||
unreachable = gc.collect(i)
|
||||
end = time.time()
|
||||
|
||||
_last_gc[i] = end
|
||||
|
||||
gc_time.labels(i).observe(end - start)
|
||||
gc_unreachable.labels(i).set(unreachable)
|
||||
|
||||
|
|
|
@ -116,6 +116,8 @@ CONDITIONAL_REQUIREMENTS = {
|
|||
# hiredis is not a *strict* dependency, but it makes things much faster.
|
||||
# (if it is not installed, we fall back to slow code.)
|
||||
"redis": ["txredisapi>=1.4.7", "hiredis"],
|
||||
# Required to use experimental `caches.track_memory_usage` config option.
|
||||
"cache_memory": ["pympler"],
|
||||
}
|
||||
|
||||
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
|
@ -35,6 +35,20 @@ get_counter = Counter(
|
|||
labelnames=["cache_name", "hit"],
|
||||
)
|
||||
|
||||
response_timer = Histogram(
|
||||
"synapse_external_cache_response_time_seconds",
|
||||
"Time taken to get a response from Redis for a cache get/set request",
|
||||
labelnames=["method"],
|
||||
buckets=(
|
||||
0.001,
|
||||
0.002,
|
||||
0.005,
|
||||
0.01,
|
||||
0.02,
|
||||
0.05,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -72,13 +86,14 @@ class ExternalCache:
|
|||
|
||||
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
|
||||
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.set(
|
||||
self._get_redis_key(cache_name, key),
|
||||
encoded_value,
|
||||
pexpire=expiry_ms,
|
||||
with response_timer.labels("set").time():
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.set(
|
||||
self._get_redis_key(cache_name, key),
|
||||
encoded_value,
|
||||
pexpire=expiry_ms,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
async def get(self, cache_name: str, key: str) -> Optional[Any]:
|
||||
"""Look up a key/value in the named cache."""
|
||||
|
@ -86,9 +101,10 @@ class ExternalCache:
|
|||
if self._redis_connection is None:
|
||||
return None
|
||||
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
)
|
||||
with response_timer.labels("get").time():
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
)
|
||||
|
||||
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
|
||||
|
||||
|
|
|
@ -37,9 +37,11 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester
|
|||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.handlers.pagination import PaginationHandler
|
||||
from synapse.handlers.room import RoomShutdownHandler
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -146,50 +148,14 @@ class DeleteRoomRestServlet(RestServlet):
|
|||
async def on_POST(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
block = content.get("block", False)
|
||||
if not isinstance(block, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'block' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge = content.get("purge", True)
|
||||
if not isinstance(purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
force_purge = content.get("force_purge", False)
|
||||
if not isinstance(force_purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'force_purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
ret = await self.room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
return await _delete_room(
|
||||
request,
|
||||
room_id,
|
||||
self.auth,
|
||||
self.room_shutdown_handler,
|
||||
self.pagination_handler,
|
||||
)
|
||||
|
||||
# Purge room
|
||||
if purge:
|
||||
await self.pagination_handler.purge_room(room_id, force=force_purge)
|
||||
|
||||
return (200, ret)
|
||||
|
||||
|
||||
class ListRoomRestServlet(RestServlet):
|
||||
"""
|
||||
|
@ -282,7 +248,22 @@ class ListRoomRestServlet(RestServlet):
|
|||
|
||||
|
||||
class RoomRestServlet(RestServlet):
|
||||
"""Get room details.
|
||||
"""Manage a room.
|
||||
|
||||
On GET : Get details of a room.
|
||||
|
||||
On DELETE : Delete a room from server.
|
||||
|
||||
It is a combination and improvement of shutdown and purge room.
|
||||
|
||||
Shuts down a room by removing all local users from the room.
|
||||
Blocking all future invites and joins to the room is optional.
|
||||
|
||||
If desired any local aliases will be repointed to a new room
|
||||
created by `new_room_user_id` and kicked users will be auto-
|
||||
joined to the new room.
|
||||
|
||||
If 'purge' is true, it will remove all traces of a room from the database.
|
||||
|
||||
TODO: Add on_POST to allow room creation without joining the room
|
||||
"""
|
||||
|
@ -293,6 +274,8 @@ class RoomRestServlet(RestServlet):
|
|||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.room_shutdown_handler = hs.get_room_shutdown_handler()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
|
@ -308,6 +291,17 @@ class RoomRestServlet(RestServlet):
|
|||
|
||||
return (200, ret)
|
||||
|
||||
async def on_DELETE(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
return await _delete_room(
|
||||
request,
|
||||
room_id,
|
||||
self.auth,
|
||||
self.room_shutdown_handler,
|
||||
self.pagination_handler,
|
||||
)
|
||||
|
||||
|
||||
class RoomMembersRestServlet(RestServlet):
|
||||
"""
|
||||
|
@ -694,3 +688,55 @@ class RoomEventContextServlet(RestServlet):
|
|||
)
|
||||
|
||||
return 200, results
|
||||
|
||||
|
||||
async def _delete_room(
|
||||
request: SynapseRequest,
|
||||
room_id: str,
|
||||
auth: "Auth",
|
||||
room_shutdown_handler: "RoomShutdownHandler",
|
||||
pagination_handler: "PaginationHandler",
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(auth, requester.user)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
block = content.get("block", False)
|
||||
if not isinstance(block, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'block' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge = content.get("purge", True)
|
||||
if not isinstance(purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
force_purge = content.get("force_purge", False)
|
||||
if not isinstance(force_purge, bool):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Param 'force_purge' must be a boolean, if given",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
ret = await room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
)
|
||||
|
||||
# Purge room
|
||||
if purge:
|
||||
await pagination_handler.purge_room(room_id, force=force_purge)
|
||||
|
||||
return (200, ret)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
import itertools
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.api.constants import PresenceState
|
||||
|
@ -229,24 +230,49 @@ class SyncRestServlet(RestServlet):
|
|||
)
|
||||
|
||||
logger.debug("building sync response dict")
|
||||
return {
|
||||
"account_data": {"events": sync_result.account_data},
|
||||
"to_device": {"events": sync_result.to_device},
|
||||
"device_lists": {
|
||||
"changed": list(sync_result.device_lists.changed),
|
||||
"left": list(sync_result.device_lists.left),
|
||||
},
|
||||
"presence": SyncRestServlet.encode_presence(sync_result.presence, time_now),
|
||||
"rooms": {"join": joined, "invite": invited, "leave": archived},
|
||||
"groups": {
|
||||
"join": sync_result.groups.join,
|
||||
"invite": sync_result.groups.invite,
|
||||
"leave": sync_result.groups.leave,
|
||||
},
|
||||
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
|
||||
"org.matrix.msc2732.device_unused_fallback_key_types": sync_result.device_unused_fallback_key_types,
|
||||
"next_batch": await sync_result.next_batch.to_string(self.store),
|
||||
}
|
||||
|
||||
response: dict = defaultdict(dict)
|
||||
response["next_batch"] = await sync_result.next_batch.to_string(self.store)
|
||||
|
||||
if sync_result.account_data:
|
||||
response["account_data"] = {"events": sync_result.account_data}
|
||||
if sync_result.presence:
|
||||
response["presence"] = SyncRestServlet.encode_presence(
|
||||
sync_result.presence, time_now
|
||||
)
|
||||
|
||||
if sync_result.to_device:
|
||||
response["to_device"] = {"events": sync_result.to_device}
|
||||
|
||||
if sync_result.device_lists.changed:
|
||||
response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
|
||||
if sync_result.device_lists.left:
|
||||
response["device_lists"]["left"] = list(sync_result.device_lists.left)
|
||||
|
||||
if sync_result.device_one_time_keys_count:
|
||||
response[
|
||||
"device_one_time_keys_count"
|
||||
] = sync_result.device_one_time_keys_count
|
||||
if sync_result.device_unused_fallback_key_types:
|
||||
response[
|
||||
"org.matrix.msc2732.device_unused_fallback_key_types"
|
||||
] = sync_result.device_unused_fallback_key_types
|
||||
|
||||
if joined:
|
||||
response["rooms"]["join"] = joined
|
||||
if invited:
|
||||
response["rooms"]["invite"] = invited
|
||||
if archived:
|
||||
response["rooms"]["leave"] = archived
|
||||
|
||||
if sync_result.groups.join:
|
||||
response["groups"]["join"] = sync_result.groups.join
|
||||
if sync_result.groups.invite:
|
||||
response["groups"]["invite"] = sync_result.groups.invite
|
||||
if sync_result.groups.leave:
|
||||
response["groups"]["leave"] = sync_result.groups.leave
|
||||
|
||||
return response
|
||||
|
||||
@staticmethod
|
||||
def encode_presence(events, time_now):
|
||||
|
|
|
@ -213,19 +213,23 @@ class StateHandler:
|
|||
return ret.state
|
||||
|
||||
async def get_current_users_in_room(
|
||||
self, room_id: str, latest_event_ids: Optional[List[str]] = None
|
||||
self, room_id: str, latest_event_ids: List[str]
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
"""
|
||||
Get the users who are currently in a room.
|
||||
|
||||
Note: This is much slower than using the equivalent method
|
||||
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
|
||||
so this should only be used when wanting the users at a particular point
|
||||
in the room.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room.
|
||||
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
|
||||
Returns:
|
||||
Dictionary of user IDs to their profileinfo.
|
||||
"""
|
||||
if not latest_event_ids:
|
||||
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)
|
||||
|
||||
assert latest_event_ids is not None
|
||||
|
||||
logger.debug("calling resolve_state_groups from get_current_users_in_room")
|
||||
|
|
|
@ -69,6 +69,7 @@ class SQLBaseStore(metaclass=ABCMeta):
|
|||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
|
||||
|
||||
|
|
|
@ -715,7 +715,9 @@ class DatabasePool:
|
|||
# pool).
|
||||
assert not self.engine.in_transaction(conn)
|
||||
|
||||
with LoggingContext("runWithConnection", parent_context) as context:
|
||||
with LoggingContext(
|
||||
str(curr_context), parent_context=parent_context
|
||||
) as context:
|
||||
sched_duration_sec = monotonic_time() - start_time
|
||||
sql_scheduling_timer.observe(sched_duration_sec)
|
||||
context.add_database_scheduled(sched_duration_sec)
|
||||
|
|
|
@ -205,8 +205,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
|
||||
def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
|
||||
sql = """
|
||||
SELECT user_id, display_name, avatar_url FROM room_memberships
|
||||
WHERE room_id = ? AND membership = ?
|
||||
SELECT state_key, display_name, avatar_url FROM room_memberships as m
|
||||
INNER JOIN current_state_events as c
|
||||
ON m.event_id = c.event_id
|
||||
AND m.room_id = c.room_id
|
||||
AND m.user_id = c.state_key
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
|
||||
"""
|
||||
txn.execute(sql, (room_id, Membership.JOIN))
|
||||
|
||||
|
|
|
@ -142,8 +142,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
|||
batch_size (int): Maximum number of state events to process
|
||||
per cycle.
|
||||
"""
|
||||
state = self.hs.get_state_handler()
|
||||
|
||||
# If we don't have progress filed, delete everything.
|
||||
if not progress:
|
||||
await self.delete_all_from_user_dir()
|
||||
|
@ -197,7 +195,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
|||
room_id
|
||||
)
|
||||
|
||||
users_with_profile = await state.get_current_users_in_room(room_id)
|
||||
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
|
||||
user_ids = set(users_with_profile)
|
||||
|
||||
# Update each user in the user directory.
|
||||
|
|
|
@ -24,6 +24,11 @@ from synapse.config.cache import add_resizable_cache
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Whether to track estimated memory usage of the LruCaches.
|
||||
TRACK_MEMORY_USAGE = False
|
||||
|
||||
|
||||
caches_by_name = {} # type: Dict[str, Sized]
|
||||
collectors_by_name = {} # type: Dict[str, CacheMetric]
|
||||
|
||||
|
@ -32,6 +37,11 @@ cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
|
|||
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
|
||||
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
|
||||
cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
|
||||
cache_memory_usage = Gauge(
|
||||
"synapse_util_caches_cache_size_bytes",
|
||||
"Estimated memory usage of the caches",
|
||||
["name"],
|
||||
)
|
||||
|
||||
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
|
||||
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
|
||||
|
@ -52,6 +62,7 @@ class CacheMetric:
|
|||
hits = attr.ib(default=0)
|
||||
misses = attr.ib(default=0)
|
||||
evicted_size = attr.ib(default=0)
|
||||
memory_usage = attr.ib(default=None)
|
||||
|
||||
def inc_hits(self):
|
||||
self.hits += 1
|
||||
|
@ -62,6 +73,19 @@ class CacheMetric:
|
|||
def inc_evictions(self, size=1):
|
||||
self.evicted_size += size
|
||||
|
||||
def inc_memory_usage(self, memory: int):
|
||||
if self.memory_usage is None:
|
||||
self.memory_usage = 0
|
||||
|
||||
self.memory_usage += memory
|
||||
|
||||
def dec_memory_usage(self, memory: int):
|
||||
self.memory_usage -= memory
|
||||
|
||||
def clear_memory_usage(self):
|
||||
if self.memory_usage is not None:
|
||||
self.memory_usage = 0
|
||||
|
||||
def describe(self):
|
||||
return []
|
||||
|
||||
|
@ -81,6 +105,13 @@ class CacheMetric:
|
|||
cache_total.labels(self._cache_name).set(self.hits + self.misses)
|
||||
if getattr(self._cache, "max_size", None):
|
||||
cache_max_size.labels(self._cache_name).set(self._cache.max_size)
|
||||
|
||||
if TRACK_MEMORY_USAGE:
|
||||
# self.memory_usage can be None if nothing has been inserted
|
||||
# into the cache yet.
|
||||
cache_memory_usage.labels(self._cache_name).set(
|
||||
self.memory_usage or 0
|
||||
)
|
||||
if self._collect_callback:
|
||||
self._collect_callback()
|
||||
except Exception as e:
|
||||
|
|
|
@ -17,8 +17,10 @@ from functools import wraps
|
|||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Collection,
|
||||
Generic,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
|
@ -30,9 +32,36 @@ from typing import (
|
|||
from typing_extensions import Literal
|
||||
|
||||
from synapse.config import cache as cache_config
|
||||
from synapse.util import caches
|
||||
from synapse.util.caches import CacheMetric, register_cache
|
||||
from synapse.util.caches.treecache import TreeCache
|
||||
|
||||
try:
|
||||
from pympler.asizeof import Asizer
|
||||
|
||||
def _get_size_of(val: Any, *, recurse=True) -> int:
|
||||
"""Get an estimate of the size in bytes of the object.
|
||||
|
||||
Args:
|
||||
val: The object to size.
|
||||
recurse: If true will include referenced values in the size,
|
||||
otherwise only sizes the given object.
|
||||
"""
|
||||
# Ignore singleton values when calculating memory usage.
|
||||
if val in ((), None, ""):
|
||||
return 0
|
||||
|
||||
sizer = Asizer()
|
||||
sizer.exclude_refs((), None, "")
|
||||
return sizer.asizeof(val, limit=100 if recurse else 0)
|
||||
|
||||
|
||||
except ImportError:
|
||||
|
||||
def _get_size_of(val: Any, *, recurse=True) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
# Function type: the type used for invalidation callbacks
|
||||
FT = TypeVar("FT", bound=Callable[..., Any])
|
||||
|
||||
|
@ -54,16 +83,69 @@ def enumerate_leaves(node, depth):
|
|||
|
||||
|
||||
class _Node:
|
||||
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
|
||||
__slots__ = ["prev_node", "next_node", "key", "value", "callbacks", "memory"]
|
||||
|
||||
def __init__(
|
||||
self, prev_node, next_node, key, value, callbacks: Optional[set] = None
|
||||
self,
|
||||
prev_node,
|
||||
next_node,
|
||||
key,
|
||||
value,
|
||||
callbacks: Collection[Callable[[], None]] = (),
|
||||
):
|
||||
self.prev_node = prev_node
|
||||
self.next_node = next_node
|
||||
self.key = key
|
||||
self.value = value
|
||||
self.callbacks = callbacks or set()
|
||||
|
||||
# Set of callbacks to run when the node gets deleted. We store as a list
|
||||
# rather than a set to keep memory usage down (and since we expect few
|
||||
# entries per node, the performance of checking for duplication in a
|
||||
# list vs using a set is negligible).
|
||||
#
|
||||
# Note that we store this as an optional list to keep the memory
|
||||
# footprint down. Storing `None` is free as its a singleton, while empty
|
||||
# lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
|
||||
# thing and used sets).
|
||||
self.callbacks = None # type: Optional[List[Callable[[], None]]]
|
||||
|
||||
self.add_callbacks(callbacks)
|
||||
|
||||
self.memory = 0
|
||||
if caches.TRACK_MEMORY_USAGE:
|
||||
self.memory = (
|
||||
_get_size_of(key)
|
||||
+ _get_size_of(value)
|
||||
+ _get_size_of(self.callbacks, recurse=False)
|
||||
+ _get_size_of(self, recurse=False)
|
||||
)
|
||||
self.memory += _get_size_of(self.memory, recurse=False)
|
||||
|
||||
def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
|
||||
"""Add to stored list of callbacks, removing duplicates."""
|
||||
|
||||
if not callbacks:
|
||||
return
|
||||
|
||||
if not self.callbacks:
|
||||
self.callbacks = []
|
||||
|
||||
for callback in callbacks:
|
||||
if callback not in self.callbacks:
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def run_and_clear_callbacks(self) -> None:
|
||||
"""Run all callbacks and clear the stored list of callbacks. Used when
|
||||
the node is being deleted.
|
||||
"""
|
||||
|
||||
if not self.callbacks:
|
||||
return
|
||||
|
||||
for callback in self.callbacks:
|
||||
callback()
|
||||
|
||||
self.callbacks = None
|
||||
|
||||
|
||||
class LruCache(Generic[KT, VT]):
|
||||
|
@ -177,10 +259,10 @@ class LruCache(Generic[KT, VT]):
|
|||
|
||||
self.len = synchronized(cache_len)
|
||||
|
||||
def add_node(key, value, callbacks: Optional[set] = None):
|
||||
def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
|
||||
prev_node = list_root
|
||||
next_node = prev_node.next_node
|
||||
node = _Node(prev_node, next_node, key, value, callbacks or set())
|
||||
node = _Node(prev_node, next_node, key, value, callbacks)
|
||||
prev_node.next_node = node
|
||||
next_node.prev_node = node
|
||||
cache[key] = node
|
||||
|
@ -188,6 +270,9 @@ class LruCache(Generic[KT, VT]):
|
|||
if size_callback:
|
||||
cached_cache_len[0] += size_callback(node.value)
|
||||
|
||||
if caches.TRACK_MEMORY_USAGE and metrics:
|
||||
metrics.inc_memory_usage(node.memory)
|
||||
|
||||
def move_node_to_front(node):
|
||||
prev_node = node.prev_node
|
||||
next_node = node.next_node
|
||||
|
@ -211,16 +296,18 @@ class LruCache(Generic[KT, VT]):
|
|||
deleted_len = size_callback(node.value)
|
||||
cached_cache_len[0] -= deleted_len
|
||||
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
node.run_and_clear_callbacks()
|
||||
|
||||
if caches.TRACK_MEMORY_USAGE and metrics:
|
||||
metrics.dec_memory_usage(node.memory)
|
||||
|
||||
return deleted_len
|
||||
|
||||
@overload
|
||||
def cache_get(
|
||||
key: KT,
|
||||
default: Literal[None] = None,
|
||||
callbacks: Iterable[Callable[[], None]] = ...,
|
||||
callbacks: Collection[Callable[[], None]] = ...,
|
||||
update_metrics: bool = ...,
|
||||
) -> Optional[VT]:
|
||||
...
|
||||
|
@ -229,7 +316,7 @@ class LruCache(Generic[KT, VT]):
|
|||
def cache_get(
|
||||
key: KT,
|
||||
default: T,
|
||||
callbacks: Iterable[Callable[[], None]] = ...,
|
||||
callbacks: Collection[Callable[[], None]] = ...,
|
||||
update_metrics: bool = ...,
|
||||
) -> Union[T, VT]:
|
||||
...
|
||||
|
@ -238,13 +325,13 @@ class LruCache(Generic[KT, VT]):
|
|||
def cache_get(
|
||||
key: KT,
|
||||
default: Optional[T] = None,
|
||||
callbacks: Iterable[Callable[[], None]] = (),
|
||||
callbacks: Collection[Callable[[], None]] = (),
|
||||
update_metrics: bool = True,
|
||||
):
|
||||
node = cache.get(key, None)
|
||||
if node is not None:
|
||||
move_node_to_front(node)
|
||||
node.callbacks.update(callbacks)
|
||||
node.add_callbacks(callbacks)
|
||||
if update_metrics and metrics:
|
||||
metrics.inc_hits()
|
||||
return node.value
|
||||
|
@ -260,10 +347,8 @@ class LruCache(Generic[KT, VT]):
|
|||
# We sometimes store large objects, e.g. dicts, which cause
|
||||
# the inequality check to take a long time. So let's only do
|
||||
# the check if we have some callbacks to call.
|
||||
if node.callbacks and value != node.value:
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
if value != node.value:
|
||||
node.run_and_clear_callbacks()
|
||||
|
||||
# We don't bother to protect this by value != node.value as
|
||||
# generally size_callback will be cheap compared with equality
|
||||
|
@ -273,7 +358,7 @@ class LruCache(Generic[KT, VT]):
|
|||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
|
||||
node.callbacks.update(callbacks)
|
||||
node.add_callbacks(callbacks)
|
||||
|
||||
move_node_to_front(node)
|
||||
node.value = value
|
||||
|
@ -326,12 +411,14 @@ class LruCache(Generic[KT, VT]):
|
|||
list_root.next_node = list_root
|
||||
list_root.prev_node = list_root
|
||||
for node in cache.values():
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.run_and_clear_callbacks()
|
||||
cache.clear()
|
||||
if size_callback:
|
||||
cached_cache_len[0] = 0
|
||||
|
||||
if caches.TRACK_MEMORY_USAGE and metrics:
|
||||
metrics.clear_memory_usage()
|
||||
|
||||
@synchronized
|
||||
def cache_contains(key: KT) -> bool:
|
||||
return key in cache
|
||||
|
|
|
@ -729,7 +729,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
)
|
||||
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations=["server2"], states={expected_state}
|
||||
destinations={"server2"}, states=[expected_state]
|
||||
)
|
||||
|
||||
#
|
||||
|
@ -740,7 +740,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
self._add_new_user(room_id, "@bob:server3")
|
||||
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations=["server3"], states={expected_state}
|
||||
destinations={"server3"}, states=[expected_state]
|
||||
)
|
||||
|
||||
def test_remote_gets_presence_when_local_user_joins(self):
|
||||
|
@ -788,14 +788,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
|
|||
self.presence_handler.current_state_for_user("@test2:server")
|
||||
)
|
||||
self.assertEqual(expected_state.state, PresenceState.ONLINE)
|
||||
self.assertEqual(
|
||||
self.federation_sender.send_presence_to_destinations.call_count, 2
|
||||
)
|
||||
self.federation_sender.send_presence_to_destinations.assert_any_call(
|
||||
destinations=["server3"], states={expected_state}
|
||||
)
|
||||
self.federation_sender.send_presence_to_destinations.assert_any_call(
|
||||
destinations=["server2"], states={expected_state}
|
||||
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
|
||||
destinations={"server2", "server3"}, states=[expected_state]
|
||||
)
|
||||
|
||||
def _add_new_user(self, room_id, user_id):
|
||||
|
|
|
@ -17,6 +17,8 @@ import urllib.parse
|
|||
from typing import List, Optional
|
||||
from unittest.mock import Mock
|
||||
|
||||
from parameterized import parameterized_class
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import Codes
|
||||
|
@ -144,6 +146,13 @@ class ShutdownRoomTestCase(unittest.HomeserverTestCase):
|
|||
)
|
||||
|
||||
|
||||
@parameterized_class(
|
||||
("method", "url_template"),
|
||||
[
|
||||
("POST", "/_synapse/admin/v1/rooms/%s/delete"),
|
||||
("DELETE", "/_synapse/admin/v1/rooms/%s"),
|
||||
],
|
||||
)
|
||||
class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
|
@ -175,7 +184,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
self.room_id = self.helper.create_room_as(
|
||||
self.other_user, tok=self.other_user_tok
|
||||
)
|
||||
self.url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id
|
||||
self.url = self.url_template % self.room_id
|
||||
|
||||
def test_requester_is_no_admin(self):
|
||||
"""
|
||||
|
@ -183,7 +192,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url,
|
||||
json.dumps({}),
|
||||
access_token=self.other_user_tok,
|
||||
|
@ -196,10 +205,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
"""
|
||||
Check that unknown rooms/server return error 404.
|
||||
"""
|
||||
url = "/_synapse/admin/v1/rooms/!unknown:test/delete"
|
||||
url = self.url_template % "!unknown:test"
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
url,
|
||||
json.dumps({}),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -212,10 +221,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
"""
|
||||
Check that invalid room names, return an error 400.
|
||||
"""
|
||||
url = "/_synapse/admin/v1/rooms/invalidroom/delete"
|
||||
url = self.url_template % "invalidroom"
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
url,
|
||||
json.dumps({}),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -234,7 +243,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"new_room_user_id": "@unknown:test"})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url,
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -253,7 +262,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"new_room_user_id": "@not:exist.bla"})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url,
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -272,7 +281,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"block": "NotBool"})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url,
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -288,7 +297,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"purge": "NotBool"})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url,
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -314,7 +323,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"block": True, "purge": True})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url.encode("ascii"),
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -347,7 +356,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"block": False, "purge": True})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url.encode("ascii"),
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -381,7 +390,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
body = json.dumps({"block": False, "purge": False})
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.method,
|
||||
self.url.encode("ascii"),
|
||||
content=body.encode(encoding="utf_8"),
|
||||
access_token=self.admin_user_tok,
|
||||
|
@ -426,10 +435,9 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
self._is_member(room_id=self.room_id, user_id=self.other_user)
|
||||
|
||||
# Test that the admin can still send shutdown
|
||||
url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
self.method,
|
||||
self.url,
|
||||
json.dumps({"new_room_user_id": self.admin_user}),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
@ -473,10 +481,9 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
|||
self._is_member(room_id=self.room_id, user_id=self.other_user)
|
||||
|
||||
# Test that the admin can still send shutdown
|
||||
url = "/_synapse/admin/v1/rooms/%s/delete" % self.room_id
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
self.method,
|
||||
self.url,
|
||||
json.dumps({"new_room_user_id": self.admin_user}),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
|
|
@ -37,35 +37,7 @@ class FilterTestCase(unittest.HomeserverTestCase):
|
|||
channel = self.make_request("GET", "/sync")
|
||||
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertTrue(
|
||||
{
|
||||
"next_batch",
|
||||
"rooms",
|
||||
"presence",
|
||||
"account_data",
|
||||
"to_device",
|
||||
"device_lists",
|
||||
}.issubset(set(channel.json_body.keys()))
|
||||
)
|
||||
|
||||
def test_sync_presence_disabled(self):
|
||||
"""
|
||||
When presence is disabled, the key does not appear in /sync.
|
||||
"""
|
||||
self.hs.config.use_presence = False
|
||||
|
||||
channel = self.make_request("GET", "/sync")
|
||||
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.assertTrue(
|
||||
{
|
||||
"next_batch",
|
||||
"rooms",
|
||||
"account_data",
|
||||
"to_device",
|
||||
"device_lists",
|
||||
}.issubset(set(channel.json_body.keys()))
|
||||
)
|
||||
self.assertIn("next_batch", channel.json_body)
|
||||
|
||||
|
||||
class SyncFilterTestCase(unittest.HomeserverTestCase):
|
||||
|
|
|
@ -306,8 +306,9 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
|
|||
|
||||
channel = self.make_request("GET", "/sync?timeout=0", access_token=tok)
|
||||
|
||||
invites = channel.json_body["rooms"]["invite"]
|
||||
self.assertEqual(len(invites), 0, invites)
|
||||
self.assertNotIn(
|
||||
"rooms", channel.json_body, "Got invites without server notice"
|
||||
)
|
||||
|
||||
def test_invite_with_notice(self):
|
||||
"""Tests that, if the MAU limit is hit, the server notices user invites each user
|
||||
|
@ -364,7 +365,8 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
|
|||
# We could also pick another user and sync with it, which would return an
|
||||
# invite to a system notices room, but it doesn't matter which user we're
|
||||
# using so we use the last one because it saves us an extra sync.
|
||||
invites = channel.json_body["rooms"]["invite"]
|
||||
if "rooms" in channel.json_body:
|
||||
invites = channel.json_body["rooms"]["invite"]
|
||||
|
||||
# Make sure we have an invite to process.
|
||||
self.assertEqual(len(invites), 1, invites)
|
||||
|
|
Loading…
Reference in a new issue