Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
8ba1086801
1
changelog.d/7606.bugfix
Normal file
1
changelog.d/7606.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Remove `user_id` from the response to `GET /_matrix/client/r0/presence/{userId}/status` to match the specification.
|
1
changelog.d/7636.misc
Normal file
1
changelog.d/7636.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Refactor getting replication updates from database.
|
1
changelog.d/7639.feature
Normal file
1
changelog.d/7639.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add an option to enable encryption by default for new rooms.
|
1
changelog.d/7648.bugfix
Normal file
1
changelog.d/7648.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
In working mode, ensure that replicated data has not already been received.
|
1
changelog.d/7652.doc
Normal file
1
changelog.d/7652.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Spelling correction in sample_config.yaml.
|
1
changelog.d/7657.misc
Normal file
1
changelog.d/7657.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Clean-up the login fallback code.
|
1
changelog.d/7659.doc
Normal file
1
changelog.d/7659.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Added instructions for how to use Keycloak via OpenID Connect to authenticate with Synapse.
|
1
changelog.d/7663.bugfix
Normal file
1
changelog.d/7663.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix intermittent exception during startup, introduced in Synapse 1.14.0.
|
1
changelog.d/7664.misc
Normal file
1
changelog.d/7664.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Increase the default SAML session expirary time to 15 minutes.
|
1
changelog.d/7673.feature
Normal file
1
changelog.d/7673.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625).
|
1
changelog.d/7677.bugfix
Normal file
1
changelog.d/7677.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Include a user-agent for federation and well-known requests.
|
1
changelog.d/7678.misc
Normal file
1
changelog.d/7678.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Convert the device message and pagination handlers to async/await.
|
1
changelog.d/7681.misc
Normal file
1
changelog.d/7681.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Refactor handling of `listeners` configuration settings.
|
1
changelog.d/7687.bugfix
Normal file
1
changelog.d/7687.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Accept the proper field (`phone`) for the `m.id.phone` identifier type. The legacy field of `number` is still accepted as a fallback. Bug introduced in v0.20.0-rc1.
|
1
changelog.d/7688.bugfix
Normal file
1
changelog.d/7688.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix "Starting db txn 'get_completed_ui_auth_stages' from sentinel context" warning. The bug was introduced in 1.13.0rc1.
|
1
changelog.d/7689.bugfix
Normal file
1
changelog.d/7689.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Compare the URI and method during user interactive authentication (instead of the URI twice). Bug introduced in 1.13.0rc1.
|
1
changelog.d/7691.bugfix
Normal file
1
changelog.d/7691.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix a long standing bug where the response to the `GET room_keys/version` endpoint had the incorrect type for the `etag` field.
|
1
changelog.d/7692.misc
Normal file
1
changelog.d/7692.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Replace uses of `six.iterkeys`/`iteritems`/`itervalues` with `keys()`/`items()`/`values()`.
|
1
changelog.d/7697.misc
Normal file
1
changelog.d/7697.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for using `rust-python-jaeger-reporter` library to reduce jaeger tracing overhead.
|
1
changelog.d/7698.bugfix
Normal file
1
changelog.d/7698.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix logged error during device resync in opentracing. Broke in v1.14.0.
|
1
changelog.d/7701.bugfix
Normal file
1
changelog.d/7701.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Do not break push rule evaluation when receiving an event with a non-string body. This is a long-standing bug.
|
1
changelog.d/7704.misc
Normal file
1
changelog.d/7704.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Replace all remaining uses of `six` with native Python 3 equivalents. Contributed by @ilmari.
|
1
changelog.d/7706.feature
Normal file
1
changelog.d/7706.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for running multiple media repository workers. See [docs/workers.md](docs/workers.md) for instructions.
|
1
changelog.d/7708.bugfix
Normal file
1
changelog.d/7708.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fixs a long standing bug which resulted in an exception: "TypeError: argument of type 'ObservableDeferred' is not iterable".
|
1
changelog.d/7712.misc
Normal file
1
changelog.d/7712.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix broken link in sample config.
|
1
changelog.d/7714.bugfix
Normal file
1
changelog.d/7714.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Synapse will now fetch media from the proper specified URL (using the r0 prefix instead of the unspecified v1).
|
|
@ -24,8 +24,6 @@ import argparse
|
|||
from synapse.events import FrozenEvent
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
|
||||
from six import string_types
|
||||
|
||||
|
||||
def make_graph(file_name, room_id, file_prefix, limit):
|
||||
print("Reading lines")
|
||||
|
@ -62,7 +60,7 @@ def make_graph(file_name, room_id, file_prefix, limit):
|
|||
for key, value in unfreeze(event.get_dict()["content"]).items():
|
||||
if value is None:
|
||||
value = "<null>"
|
||||
elif isinstance(value, string_types):
|
||||
elif isinstance(value, str):
|
||||
pass
|
||||
else:
|
||||
value = json.dumps(value)
|
||||
|
|
|
@ -23,6 +23,7 @@ such as [Github][github-idp].
|
|||
[auth0]: https://auth0.com/
|
||||
[okta]: https://www.okta.com/
|
||||
[dex-idp]: https://github.com/dexidp/dex
|
||||
[keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
|
||||
[hydra]: https://www.ory.sh/docs/hydra/
|
||||
[github-idp]: https://developer.github.com/apps/building-oauth-apps/authorizing-oauth-apps
|
||||
|
||||
|
@ -89,7 +90,50 @@ oidc_config:
|
|||
localpart_template: "{{ user.name }}"
|
||||
display_name_template: "{{ user.name|capitalize }}"
|
||||
```
|
||||
### [Keycloak][keycloak-idp]
|
||||
|
||||
[Keycloak][keycloak-idp] is an opensource IdP maintained by Red Hat.
|
||||
|
||||
Follow the [Getting Started Guide](https://www.keycloak.org/getting-started) to install Keycloak and set up a realm.
|
||||
|
||||
1. Click `Clients` in the sidebar and click `Create`
|
||||
|
||||
2. Fill in the fields as below:
|
||||
|
||||
| Field | Value |
|
||||
|-----------|-----------|
|
||||
| Client ID | `synapse` |
|
||||
| Client Protocol | `openid-connect` |
|
||||
|
||||
3. Click `Save`
|
||||
4. Fill in the fields as below:
|
||||
|
||||
| Field | Value |
|
||||
|-----------|-----------|
|
||||
| Client ID | `synapse` |
|
||||
| Enabled | `On` |
|
||||
| Client Protocol | `openid-connect` |
|
||||
| Access Type | `confidential` |
|
||||
| Valid Redirect URIs | `[synapse public baseurl]/_synapse/oidc/callback` |
|
||||
|
||||
5. Click `Save`
|
||||
6. On the Credentials tab, update the fields:
|
||||
|
||||
| Field | Value |
|
||||
|-------|-------|
|
||||
| Client Authenticator | `Client ID and Secret` |
|
||||
|
||||
7. Click `Regenerate Secret`
|
||||
8. Copy Secret
|
||||
|
||||
```yaml
|
||||
oidc_config:
|
||||
enabled: true
|
||||
issuer: "https://127.0.0.1:8443/auth/realms/{realm_name}"
|
||||
client_id: "synapse"
|
||||
client_secret: "copy secret generated from above"
|
||||
scopes: ["openid", "profile"]
|
||||
```
|
||||
### [Auth0][auth0]
|
||||
|
||||
1. Create a regular web application for Synapse
|
||||
|
|
|
@ -283,7 +283,7 @@ listeners:
|
|||
# number of monthly active users.
|
||||
#
|
||||
# 'limit_usage_by_mau' disables/enables monthly active user blocking. When
|
||||
# anabled and a limit is reached the server returns a 'ResourceLimitError'
|
||||
# enabled and a limit is reached the server returns a 'ResourceLimitError'
|
||||
# with error type Codes.RESOURCE_LIMIT_EXCEEDED
|
||||
#
|
||||
# 'max_mau_value' is the hard limit of monthly active users above which
|
||||
|
@ -1454,7 +1454,7 @@ saml2_config:
|
|||
|
||||
# The lifetime of a SAML session. This defines how long a user has to
|
||||
# complete the authentication process, if allow_unsolicited is unset.
|
||||
# The default is 5 minutes.
|
||||
# The default is 15 minutes.
|
||||
#
|
||||
#saml_session_lifetime: 5m
|
||||
|
||||
|
@ -1539,7 +1539,7 @@ saml2_config:
|
|||
# use an OpenID Connect Provider for authentication, instead of its internal
|
||||
# password database.
|
||||
#
|
||||
# See https://github.com/matrix-org/synapse/blob/master/openid.md.
|
||||
# See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
|
||||
#
|
||||
oidc_config:
|
||||
# Uncomment the following to enable authorization against an OpenID Connect
|
||||
|
@ -1973,6 +1973,26 @@ spam_checker:
|
|||
# example_stop_events_from: ['@bad:example.com']
|
||||
|
||||
|
||||
## Rooms ##
|
||||
|
||||
# Controls whether locally-created rooms should be end-to-end encrypted by
|
||||
# default.
|
||||
#
|
||||
# Possible options are "all", "invite", and "off". They are defined as:
|
||||
#
|
||||
# * "all": any locally-created room
|
||||
# * "invite": any room created with the "private_chat" or "trusted_private_chat"
|
||||
# room creation presets
|
||||
# * "off": this option will take no effect
|
||||
#
|
||||
# The default value is "off".
|
||||
#
|
||||
# Note that this option will only affect rooms created after it is set. It
|
||||
# will also not affect rooms created by other servers.
|
||||
#
|
||||
#encryption_enabled_by_default_for_room_type: invite
|
||||
|
||||
|
||||
# Uncomment to allow non-server-admin users to create groups on this server
|
||||
#
|
||||
#enable_group_creation: true
|
||||
|
|
|
@ -307,7 +307,12 @@ expose the `media` resource. For example:
|
|||
- media
|
||||
```
|
||||
|
||||
Note this worker cannot be load-balanced: only one instance should be active.
|
||||
Note that if running multiple media repositories they must be on the same server
|
||||
and you must configure a single instance to run the background tasks, e.g.:
|
||||
|
||||
```yaml
|
||||
media_instance_running_background_jobs: "media-repository-1"
|
||||
```
|
||||
|
||||
### `synapse.app.client_reader`
|
||||
|
||||
|
|
3
mypy.ini
3
mypy.ini
|
@ -78,3 +78,6 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-authlib.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-rust_python_jaeger_reporter.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -21,8 +21,7 @@ import argparse
|
|||
import base64
|
||||
import json
|
||||
import sys
|
||||
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from urllib import parse as urlparse
|
||||
|
||||
import nacl.signing
|
||||
import requests
|
||||
|
|
|
@ -23,8 +23,6 @@ import sys
|
|||
import time
|
||||
import traceback
|
||||
|
||||
from six import string_types
|
||||
|
||||
import yaml
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
|
@ -635,7 +633,7 @@ class Porter(object):
|
|||
return bool(col)
|
||||
if isinstance(col, bytes):
|
||||
return bytearray(col)
|
||||
elif isinstance(col, string_types) and "\0" in col:
|
||||
elif isinstance(col, str) and "\0" in col:
|
||||
logger.warning(
|
||||
"DROPPING ROW: NUL value in table %s col %s: %r",
|
||||
table,
|
||||
|
|
|
@ -31,7 +31,7 @@ sections=FUTURE,STDLIB,COMPAT,THIRDPARTY,TWISTED,FIRSTPARTY,TESTS,LOCALFOLDER
|
|||
default_section=THIRDPARTY
|
||||
known_first_party = synapse
|
||||
known_tests=tests
|
||||
known_compat = mock,six
|
||||
known_compat = mock
|
||||
known_twisted=twisted,OpenSSL
|
||||
multi_line_output=3
|
||||
include_trailing_comma=true
|
||||
|
|
|
@ -23,8 +23,6 @@ import hmac
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from six.moves import input
|
||||
|
||||
import requests as _requests
|
||||
import yaml
|
||||
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from six import itervalues
|
||||
|
||||
import pymacaroons
|
||||
from netaddr import IPAddress
|
||||
|
||||
|
@ -90,7 +88,7 @@ class Auth(object):
|
|||
event, prev_state_ids, for_verification=True
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
|
||||
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
|
||||
|
||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||
event_auth.check(
|
||||
|
|
|
@ -150,3 +150,8 @@ class EventContentFields(object):
|
|||
# Timestamp to delete the event after
|
||||
# cf https://github.com/matrix-org/matrix-doc/pull/2228
|
||||
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
|
||||
|
||||
|
||||
class RoomEncryptionAlgorithms(object):
|
||||
MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
|
||||
DEFAULT = MEGOLM_V1_AES_SHA2
|
||||
|
|
|
@ -17,11 +17,9 @@
|
|||
"""Contains exceptions and error codes."""
|
||||
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Dict, List
|
||||
|
||||
from six import iteritems
|
||||
from six.moves import http_client
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.web import http
|
||||
|
@ -174,7 +172,7 @@ class ConsentNotGivenError(SynapseError):
|
|||
consent_url (str): The URL where the user can give their consent
|
||||
"""
|
||||
super(ConsentNotGivenError, self).__init__(
|
||||
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
|
||||
code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
|
||||
)
|
||||
self._consent_uri = consent_uri
|
||||
|
||||
|
@ -194,7 +192,7 @@ class UserDeactivatedError(SynapseError):
|
|||
msg (str): The human-readable error message
|
||||
"""
|
||||
super(UserDeactivatedError, self).__init__(
|
||||
code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
|
||||
code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
|
||||
)
|
||||
|
||||
|
||||
|
@ -497,7 +495,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
|||
A dict representing the error response JSON.
|
||||
"""
|
||||
err = {"error": msg, "errcode": code}
|
||||
for key, value in iteritems(kwargs):
|
||||
for key, value in kwargs.items():
|
||||
err[key] = value
|
||||
return err
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
# limitations under the License.
|
||||
from typing import List
|
||||
|
||||
from six import text_type
|
||||
|
||||
import jsonschema
|
||||
from canonicaljson import json
|
||||
from jsonschema import FormatChecker
|
||||
|
@ -313,7 +311,7 @@ class Filter(object):
|
|||
|
||||
content = event.get("content", {})
|
||||
# check if there is a string url field in the content for filtering purposes
|
||||
contains_url = isinstance(content.get("url"), text_type)
|
||||
contains_url = isinstance(content.get("url"), str)
|
||||
labels = content.get(EventContentFields.LABELS, [])
|
||||
|
||||
return self.check_fields(room_id, sender, ev_type, labels, contains_url)
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
"""Contains the URL paths to prefix various aspects of the server with. """
|
||||
import hmac
|
||||
from hashlib import sha256
|
||||
|
||||
from six.moves.urllib.parse import urlencode
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from synapse.config import ConfigError
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import signal
|
|||
import socket
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Iterable
|
||||
|
||||
from daemonize import Daemonize
|
||||
from typing_extensions import NoReturn
|
||||
|
@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
|
|||
|
||||
import synapse
|
||||
from synapse.app import check_bind_error
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
@ -234,7 +236,7 @@ def refresh_certificate(hs):
|
|||
logger.info("Context factories updated.")
|
||||
|
||||
|
||||
def start(hs, listeners=None):
|
||||
def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||
"""
|
||||
Start a Synapse server or worker.
|
||||
|
||||
|
@ -245,8 +247,8 @@ def start(hs, listeners=None):
|
|||
notify systemd.
|
||||
|
||||
Args:
|
||||
hs (synapse.server.HomeServer)
|
||||
listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
|
||||
hs: homeserver instance
|
||||
listeners: Listener configuration ('listeners' in homeserver.yaml)
|
||||
"""
|
||||
try:
|
||||
# Set up the SIGHUP machinery.
|
||||
|
|
|
@ -37,6 +37,7 @@ from synapse.app import _base
|
|||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.handlers.presence import (
|
||||
|
@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
|
|||
class GenericWorkerServer(HomeServer):
|
||||
DATASTORE_CLASS = GenericWorkerSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
site_tag = listener_config.get("tag", port)
|
||||
def _listen_http(self, listener_config: ListenerConfig):
|
||||
port = listener_config.port
|
||||
bind_addresses = listener_config.bind_addresses
|
||||
|
||||
assert listener_config.http_options is not None
|
||||
|
||||
site_tag = listener_config.http_options.tag
|
||||
if site_tag is None:
|
||||
site_tag = port
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
|
@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
|
|||
" repository is disabled. Ignoring."
|
||||
)
|
||||
|
||||
if name == "openid" and "federation" not in res["names"]:
|
||||
if name == "openid" and "federation" not in res.names:
|
||||
# Only load the openid resource separately if federation resource
|
||||
# is not specified since federation resource includes openid
|
||||
# resource.
|
||||
|
@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
|
|||
|
||||
logger.info("Synapse worker now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
if listener.type == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
elif listener.type == "manhole":
|
||||
_base.listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "metrics":
|
||||
elif listener.type == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
|
@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
|
|||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
_base.listen_metrics(listener.bind_addresses, listener.port)
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
logger.warning("Unsupported listener type: %s", listener.type)
|
||||
|
||||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
|
@ -738,6 +744,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
|
|||
except Exception:
|
||||
logger.exception("Error processing replication")
|
||||
|
||||
async def on_position(self, stream_name: str, instance_name: str, token: int):
|
||||
await super().on_position(stream_name, instance_name, token)
|
||||
# Also call on_rdata to ensure that stream positions are properly reset.
|
||||
await self.on_rdata(stream_name, instance_name, token, [])
|
||||
|
||||
def stop_pusher(self, user_id, app_id, pushkey):
|
||||
if not self.notify_pushers:
|
||||
return
|
||||
|
|
|
@ -23,8 +23,7 @@ import math
|
|||
import os
|
||||
import resource
|
||||
import sys
|
||||
|
||||
from six import iteritems
|
||||
from typing import Iterable
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
|
@ -50,6 +49,7 @@ from synapse.app import _base
|
|||
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.http.additional_resource import AdditionalResource
|
||||
from synapse.http.server import (
|
||||
|
@ -89,24 +89,24 @@ def gz_wrap(r):
|
|||
class SynapseHomeServer(HomeServer):
|
||||
DATASTORE_CLASS = DataStore
|
||||
|
||||
def _listener_http(self, config, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
tls = listener_config.get("tls", False)
|
||||
site_tag = listener_config.get("tag", port)
|
||||
def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
|
||||
port = listener_config.port
|
||||
bind_addresses = listener_config.bind_addresses
|
||||
tls = listener_config.tls
|
||||
site_tag = listener_config.http_options.tag
|
||||
if site_tag is None:
|
||||
site_tag = port
|
||||
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "openid" and "federation" in res["names"]:
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "openid" and "federation" in res.names:
|
||||
# Skip loading openid resource if federation is defined
|
||||
# since federation resource will include openid
|
||||
continue
|
||||
resources.update(
|
||||
self._configure_named_resource(name, res.get("compress", False))
|
||||
)
|
||||
resources.update(self._configure_named_resource(name, res.compress))
|
||||
|
||||
additional_resources = listener_config.get("additional_resources", {})
|
||||
additional_resources = listener_config.http_options.additional_resources
|
||||
logger.debug("Configuring additional resources: %r", additional_resources)
|
||||
module_api = ModuleApi(self, self.get_auth_handler())
|
||||
for path, resmodule in additional_resources.items():
|
||||
|
@ -278,7 +278,7 @@ class SynapseHomeServer(HomeServer):
|
|||
|
||||
return resources
|
||||
|
||||
def start_listening(self, listeners):
|
||||
def start_listening(self, listeners: Iterable[ListenerConfig]):
|
||||
config = self.get_config()
|
||||
|
||||
if config.redis_enabled:
|
||||
|
@ -288,25 +288,25 @@ class SynapseHomeServer(HomeServer):
|
|||
self.get_tcp_replication().start_replication(self)
|
||||
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
if listener.type == "http":
|
||||
self._listening_services.extend(self._listener_http(config, listener))
|
||||
elif listener["type"] == "manhole":
|
||||
elif listener.type == "manhole":
|
||||
listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
manhole(
|
||||
username="matrix", password="rabbithole", globals={"hs": self}
|
||||
),
|
||||
)
|
||||
elif listener["type"] == "replication":
|
||||
elif listener.type == "replication":
|
||||
services = listen_tcp(
|
||||
listener["bind_addresses"],
|
||||
listener["port"],
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
ReplicationStreamProtocolFactory(self),
|
||||
)
|
||||
for s in services:
|
||||
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
|
||||
elif listener["type"] == "metrics":
|
||||
elif listener.type == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warning(
|
||||
(
|
||||
|
@ -315,9 +315,11 @@ class SynapseHomeServer(HomeServer):
|
|||
)
|
||||
)
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"], listener["port"])
|
||||
_base.listen_metrics(listener.bind_addresses, listener.port)
|
||||
else:
|
||||
logger.warning("Unrecognized listener type: %s", listener["type"])
|
||||
# this shouldn't happen, as the listener type should have been checked
|
||||
# during parsing
|
||||
logger.warning("Unrecognized listener type: %s", listener.type)
|
||||
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
|
@ -525,7 +527,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
|
|||
stats["total_nonbridged_users"] = total_nonbridged_users
|
||||
|
||||
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
|
||||
for name, count in iteritems(daily_user_type_results):
|
||||
for name, count in daily_user_type_results.items():
|
||||
stats["daily_user_type_" + name] = count
|
||||
|
||||
room_count = yield hs.get_datastore().get_room_count()
|
||||
|
@ -537,7 +539,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
|
|||
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
|
||||
|
||||
r30_results = yield hs.get_datastore().count_r30_users()
|
||||
for name, count in iteritems(r30_results):
|
||||
for name, count in r30_results.items():
|
||||
stats["r30_users_" + name] = count
|
||||
|
||||
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
import logging
|
||||
import re
|
||||
|
||||
from six import string_types
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
|
@ -156,7 +154,7 @@ class ApplicationService(object):
|
|||
)
|
||||
|
||||
regex = regex_obj.get("regex")
|
||||
if isinstance(regex, string_types):
|
||||
if isinstance(regex, str):
|
||||
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
|
||||
else:
|
||||
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from six.moves import urllib
|
||||
import urllib
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
|
|
@ -22,8 +22,6 @@ from collections import OrderedDict
|
|||
from textwrap import dedent
|
||||
from typing import Any, MutableMapping, Optional
|
||||
|
||||
from six import integer_types
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
|
@ -117,7 +115,7 @@ class Config(object):
|
|||
|
||||
@staticmethod
|
||||
def parse_size(value):
|
||||
if isinstance(value, integer_types):
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
sizes = {"K": 1024, "M": 1024 * 1024}
|
||||
size = 1
|
||||
|
@ -129,7 +127,7 @@ class Config(object):
|
|||
|
||||
@staticmethod
|
||||
def parse_duration(value):
|
||||
if isinstance(value, integer_types):
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
second = 1000
|
||||
minute = 60 * second
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from six import string_types
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from urllib import parse as urlparse
|
||||
|
||||
import yaml
|
||||
from netaddr import IPSet
|
||||
|
@ -98,17 +96,14 @@ def load_appservices(hostname, config_files):
|
|||
def _load_appservice(hostname, as_info, config_filename):
|
||||
required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"]
|
||||
for field in required_string_fields:
|
||||
if not isinstance(as_info.get(field), string_types):
|
||||
if not isinstance(as_info.get(field), str):
|
||||
raise KeyError(
|
||||
"Required string field: '%s' (%s)" % (field, config_filename)
|
||||
)
|
||||
|
||||
# 'url' must either be a string or explicitly null, not missing
|
||||
# to avoid accidentally turning off push for ASes.
|
||||
if (
|
||||
not isinstance(as_info.get("url"), string_types)
|
||||
and as_info.get("url", "") is not None
|
||||
):
|
||||
if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None:
|
||||
raise KeyError(
|
||||
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
|
||||
)
|
||||
|
@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename):
|
|||
ns,
|
||||
regex_obj,
|
||||
)
|
||||
if not isinstance(regex_obj.get("regex"), string_types):
|
||||
if not isinstance(regex_obj.get("regex"), str):
|
||||
raise ValueError("Missing/bad type 'regex' key in %s", regex_obj)
|
||||
if not isinstance(regex_obj.get("exclusive"), bool):
|
||||
raise ValueError(
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
from typing import Callable, Dict
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
@ -25,6 +26,9 @@ _CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
|
|||
# Map from canonicalised cache name to cache.
|
||||
_CACHES = {}
|
||||
|
||||
# a lock on the contents of _CACHES
|
||||
_CACHES_LOCK = threading.Lock()
|
||||
|
||||
_DEFAULT_FACTOR_SIZE = 0.5
|
||||
_DEFAULT_EVENT_CACHE_SIZE = "10K"
|
||||
|
||||
|
@ -66,7 +70,10 @@ def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
|
|||
# Some caches have '*' in them which we strip out.
|
||||
cache_name = _canonicalise_cache_name(cache_name)
|
||||
|
||||
_CACHES[cache_name] = cache_resize_callback
|
||||
# sometimes caches are initialised from background threads, so we need to make
|
||||
# sure we don't conflict with another thread running a resize operation
|
||||
with _CACHES_LOCK:
|
||||
_CACHES[cache_name] = cache_resize_callback
|
||||
|
||||
# Ensure all loaded caches are sized appropriately
|
||||
#
|
||||
|
@ -87,7 +94,8 @@ class CacheConfig(Config):
|
|||
os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE)
|
||||
)
|
||||
properties.resize_all_caches_func = None
|
||||
_CACHES.clear()
|
||||
with _CACHES_LOCK:
|
||||
_CACHES.clear()
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
|
@ -193,6 +201,8 @@ class CacheConfig(Config):
|
|||
For each cache, run the mapped callback function with either
|
||||
a specific cache factor or the default, global one.
|
||||
"""
|
||||
for cache_name, callback in _CACHES.items():
|
||||
new_factor = self.cache_factors.get(cache_name, self.global_factor)
|
||||
callback(new_factor)
|
||||
# block other threads from modifying _CACHES while we iterate it.
|
||||
with _CACHES_LOCK:
|
||||
for cache_name, callback in _CACHES.items():
|
||||
new_factor = self.cache_factors.get(cache_name, self.global_factor)
|
||||
callback(new_factor)
|
||||
|
|
|
@ -36,6 +36,7 @@ from .ratelimiting import RatelimitConfig
|
|||
from .redis import RedisConfig
|
||||
from .registration import RegistrationConfig
|
||||
from .repository import ContentRepositoryConfig
|
||||
from .room import RoomConfig
|
||||
from .room_directory import RoomDirectoryConfig
|
||||
from .saml2_config import SAML2Config
|
||||
from .server import ServerConfig
|
||||
|
@ -79,6 +80,7 @@ class HomeServerConfig(RootConfig):
|
|||
PasswordAuthProviderConfig,
|
||||
PushConfig,
|
||||
SpamCheckerConfig,
|
||||
RoomConfig,
|
||||
GroupsConfig,
|
||||
UserDirectoryConfig,
|
||||
ConsentConfig,
|
||||
|
|
|
@ -89,7 +89,7 @@ class OIDCConfig(Config):
|
|||
# use an OpenID Connect Provider for authentication, instead of its internal
|
||||
# password database.
|
||||
#
|
||||
# See https://github.com/matrix-org/synapse/blob/master/openid.md.
|
||||
# See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
|
||||
#
|
||||
oidc_config:
|
||||
# Uncomment the following to enable authorization against an OpenID Connect
|
||||
|
|
|
@ -94,6 +94,12 @@ class ContentRepositoryConfig(Config):
|
|||
else:
|
||||
self.can_load_media_repo = True
|
||||
|
||||
# Whether this instance should be the one to run the background jobs to
|
||||
# e.g clean up old URL previews.
|
||||
self.media_instance_running_background_jobs = config.get(
|
||||
"media_instance_running_background_jobs",
|
||||
)
|
||||
|
||||
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
|
||||
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
|
||||
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
|
||||
|
|
80
synapse/config/room.py
Normal file
80
synapse/config/room.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from synapse.api.constants import RoomCreationPreset
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
logger = logging.Logger(__name__)
|
||||
|
||||
|
||||
class RoomDefaultEncryptionTypes(object):
|
||||
"""Possible values for the encryption_enabled_by_default_for_room_type config option"""
|
||||
|
||||
ALL = "all"
|
||||
INVITE = "invite"
|
||||
OFF = "off"
|
||||
|
||||
|
||||
class RoomConfig(Config):
|
||||
section = "room"
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
# Whether new, locally-created rooms should have encryption enabled
|
||||
encryption_for_room_type = config.get(
|
||||
"encryption_enabled_by_default_for_room_type",
|
||||
RoomDefaultEncryptionTypes.OFF,
|
||||
)
|
||||
if encryption_for_room_type == RoomDefaultEncryptionTypes.ALL:
|
||||
self.encryption_enabled_by_default_for_room_presets = [
|
||||
RoomCreationPreset.PRIVATE_CHAT,
|
||||
RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
|
||||
RoomCreationPreset.PUBLIC_CHAT,
|
||||
]
|
||||
elif encryption_for_room_type == RoomDefaultEncryptionTypes.INVITE:
|
||||
self.encryption_enabled_by_default_for_room_presets = [
|
||||
RoomCreationPreset.PRIVATE_CHAT,
|
||||
RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
|
||||
]
|
||||
elif encryption_for_room_type == RoomDefaultEncryptionTypes.OFF:
|
||||
self.encryption_enabled_by_default_for_room_presets = []
|
||||
else:
|
||||
raise ConfigError(
|
||||
"Invalid value for encryption_enabled_by_default_for_room_type"
|
||||
)
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
## Rooms ##
|
||||
|
||||
# Controls whether locally-created rooms should be end-to-end encrypted by
|
||||
# default.
|
||||
#
|
||||
# Possible options are "all", "invite", and "off". They are defined as:
|
||||
#
|
||||
# * "all": any locally-created room
|
||||
# * "invite": any room created with the "private_chat" or "trusted_private_chat"
|
||||
# room creation presets
|
||||
# * "off": this option will take no effect
|
||||
#
|
||||
# The default value is "off".
|
||||
#
|
||||
# Note that this option will only affect rooms created after it is set. It
|
||||
# will also not affect rooms created by other servers.
|
||||
#
|
||||
#encryption_enabled_by_default_for_room_type: invite
|
||||
"""
|
|
@ -160,7 +160,7 @@ class SAML2Config(Config):
|
|||
|
||||
# session lifetime: in milliseconds
|
||||
self.saml2_session_lifetime = self.parse_duration(
|
||||
saml2_config.get("saml_session_lifetime", "5m")
|
||||
saml2_config.get("saml_session_lifetime", "15m")
|
||||
)
|
||||
|
||||
template_dir = saml2_config.get("template_dir")
|
||||
|
@ -286,7 +286,7 @@ class SAML2Config(Config):
|
|||
|
||||
# The lifetime of a SAML session. This defines how long a user has to
|
||||
# complete the authentication process, if allow_unsolicited is unset.
|
||||
# The default is 5 minutes.
|
||||
# The default is 15 minutes.
|
||||
#
|
||||
#saml_session_lifetime: 5m
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ import logging
|
|||
import os.path
|
||||
import re
|
||||
from textwrap import indent
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
import attr
|
||||
import yaml
|
||||
|
@ -57,6 +57,64 @@ on how to configure the new listener.
|
|||
--------------------------------------------------------------------------------"""
|
||||
|
||||
|
||||
KNOWN_LISTENER_TYPES = {
|
||||
"http",
|
||||
"metrics",
|
||||
"manhole",
|
||||
"replication",
|
||||
}
|
||||
|
||||
KNOWN_RESOURCES = {
|
||||
"client",
|
||||
"consent",
|
||||
"federation",
|
||||
"keys",
|
||||
"media",
|
||||
"metrics",
|
||||
"openid",
|
||||
"replication",
|
||||
"static",
|
||||
"webclient",
|
||||
}
|
||||
|
||||
|
||||
@attr.s(frozen=True)
|
||||
class HttpResourceConfig:
|
||||
names = attr.ib(
|
||||
type=List[str],
|
||||
factory=list,
|
||||
validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)), # type: ignore
|
||||
)
|
||||
compress = attr.ib(
|
||||
type=bool,
|
||||
default=False,
|
||||
validator=attr.validators.optional(attr.validators.instance_of(bool)), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
|
||||
@attr.s(frozen=True)
|
||||
class HttpListenerConfig:
|
||||
"""Object describing the http-specific parts of the config of a listener"""
|
||||
|
||||
x_forwarded = attr.ib(type=bool, default=False)
|
||||
resources = attr.ib(type=List[HttpResourceConfig], factory=list)
|
||||
additional_resources = attr.ib(type=Dict[str, dict], factory=dict)
|
||||
tag = attr.ib(type=str, default=None)
|
||||
|
||||
|
||||
@attr.s(frozen=True)
|
||||
class ListenerConfig:
|
||||
"""Object describing the configuration of a single listener."""
|
||||
|
||||
port = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||
bind_addresses = attr.ib(type=List[str])
|
||||
type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
|
||||
tls = attr.ib(type=bool, default=False)
|
||||
|
||||
# http_options is only populated if type=http
|
||||
http_options = attr.ib(type=Optional[HttpListenerConfig], default=None)
|
||||
|
||||
|
||||
class ServerConfig(Config):
|
||||
section = "server"
|
||||
|
||||
|
@ -379,38 +437,21 @@ class ServerConfig(Config):
|
|||
}
|
||||
]
|
||||
|
||||
self.listeners = [] # type: List[dict]
|
||||
for listener in config.get("listeners", []):
|
||||
if not isinstance(listener.get("port", None), int):
|
||||
raise ConfigError(
|
||||
"Listener configuration is lacking a valid 'port' option"
|
||||
)
|
||||
self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
|
||||
|
||||
if listener.setdefault("tls", False):
|
||||
# no_tls is not really supported any more, but let's grandfather it in
|
||||
# here.
|
||||
if config.get("no_tls", False):
|
||||
# no_tls is not really supported any more, but let's grandfather it in
|
||||
# here.
|
||||
if config.get("no_tls", False):
|
||||
l2 = []
|
||||
for listener in self.listeners:
|
||||
if listener.tls:
|
||||
logger.info(
|
||||
"Ignoring TLS-enabled listener on port %i due to no_tls"
|
||||
"Ignoring TLS-enabled listener on port %i due to no_tls",
|
||||
listener.port,
|
||||
)
|
||||
continue
|
||||
|
||||
bind_address = listener.pop("bind_address", None)
|
||||
bind_addresses = listener.setdefault("bind_addresses", [])
|
||||
|
||||
# if bind_address was specified, add it to the list of addresses
|
||||
if bind_address:
|
||||
bind_addresses.append(bind_address)
|
||||
|
||||
# if we still have an empty list of addresses, use the default list
|
||||
if not bind_addresses:
|
||||
if listener["type"] == "metrics":
|
||||
# the metrics listener doesn't support IPv6
|
||||
bind_addresses.append("0.0.0.0")
|
||||
else:
|
||||
bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
|
||||
|
||||
self.listeners.append(listener)
|
||||
l2.append(listener)
|
||||
self.listeners = l2
|
||||
|
||||
if not self.web_client_location:
|
||||
_warn_if_webclient_configured(self.listeners)
|
||||
|
@ -446,43 +487,41 @@ class ServerConfig(Config):
|
|||
bind_host = config.get("bind_host", "")
|
||||
gzip_responses = config.get("gzip_responses", True)
|
||||
|
||||
http_options = HttpListenerConfig(
|
||||
resources=[
|
||||
HttpResourceConfig(names=["client"], compress=gzip_responses),
|
||||
HttpResourceConfig(names=["federation"]),
|
||||
],
|
||||
)
|
||||
|
||||
self.listeners.append(
|
||||
{
|
||||
"port": bind_port,
|
||||
"bind_addresses": [bind_host],
|
||||
"tls": True,
|
||||
"type": "http",
|
||||
"resources": [
|
||||
{"names": ["client"], "compress": gzip_responses},
|
||||
{"names": ["federation"], "compress": False},
|
||||
],
|
||||
}
|
||||
ListenerConfig(
|
||||
port=bind_port,
|
||||
bind_addresses=[bind_host],
|
||||
tls=True,
|
||||
type="http",
|
||||
http_options=http_options,
|
||||
)
|
||||
)
|
||||
|
||||
unsecure_port = config.get("unsecure_port", bind_port - 400)
|
||||
if unsecure_port:
|
||||
self.listeners.append(
|
||||
{
|
||||
"port": unsecure_port,
|
||||
"bind_addresses": [bind_host],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
"resources": [
|
||||
{"names": ["client"], "compress": gzip_responses},
|
||||
{"names": ["federation"], "compress": False},
|
||||
],
|
||||
}
|
||||
ListenerConfig(
|
||||
port=unsecure_port,
|
||||
bind_addresses=[bind_host],
|
||||
tls=False,
|
||||
type="http",
|
||||
http_options=http_options,
|
||||
)
|
||||
)
|
||||
|
||||
manhole = config.get("manhole")
|
||||
if manhole:
|
||||
self.listeners.append(
|
||||
{
|
||||
"port": manhole,
|
||||
"bind_addresses": ["127.0.0.1"],
|
||||
"type": "manhole",
|
||||
"tls": False,
|
||||
}
|
||||
ListenerConfig(
|
||||
port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
|
||||
)
|
||||
)
|
||||
|
||||
metrics_port = config.get("metrics_port")
|
||||
|
@ -490,13 +529,14 @@ class ServerConfig(Config):
|
|||
logger.warning(METRICS_PORT_WARNING)
|
||||
|
||||
self.listeners.append(
|
||||
{
|
||||
"port": metrics_port,
|
||||
"bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
"resources": [{"names": ["metrics"], "compress": False}],
|
||||
}
|
||||
ListenerConfig(
|
||||
port=metrics_port,
|
||||
bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
|
||||
type="http",
|
||||
http_options=HttpListenerConfig(
|
||||
resources=[HttpResourceConfig(names=["metrics"])]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
_check_resource_config(self.listeners)
|
||||
|
@ -522,7 +562,7 @@ class ServerConfig(Config):
|
|||
)
|
||||
|
||||
def has_tls_listener(self) -> bool:
|
||||
return any(listener["tls"] for listener in self.listeners)
|
||||
return any(listener.tls for listener in self.listeners)
|
||||
|
||||
def generate_config_section(
|
||||
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
|
||||
|
@ -856,7 +896,7 @@ class ServerConfig(Config):
|
|||
# number of monthly active users.
|
||||
#
|
||||
# 'limit_usage_by_mau' disables/enables monthly active user blocking. When
|
||||
# anabled and a limit is reached the server returns a 'ResourceLimitError'
|
||||
# enabled and a limit is reached the server returns a 'ResourceLimitError'
|
||||
# with error type Codes.RESOURCE_LIMIT_EXCEEDED
|
||||
#
|
||||
# 'max_mau_value' is the hard limit of monthly active users above which
|
||||
|
@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds):
|
|||
)
|
||||
|
||||
|
||||
def parse_listener_def(listener: Any) -> ListenerConfig:
|
||||
"""parse a listener config from the config file"""
|
||||
listener_type = listener["type"]
|
||||
|
||||
port = listener.get("port")
|
||||
if not isinstance(port, int):
|
||||
raise ConfigError("Listener configuration is lacking a valid 'port' option")
|
||||
|
||||
tls = listener.get("tls", False)
|
||||
|
||||
bind_addresses = listener.get("bind_addresses", [])
|
||||
bind_address = listener.get("bind_address")
|
||||
# if bind_address was specified, add it to the list of addresses
|
||||
if bind_address:
|
||||
bind_addresses.append(bind_address)
|
||||
|
||||
# if we still have an empty list of addresses, use the default list
|
||||
if not bind_addresses:
|
||||
if listener_type == "metrics":
|
||||
# the metrics listener doesn't support IPv6
|
||||
bind_addresses.append("0.0.0.0")
|
||||
else:
|
||||
bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
|
||||
|
||||
http_config = None
|
||||
if listener_type == "http":
|
||||
http_config = HttpListenerConfig(
|
||||
x_forwarded=listener.get("x_forwarded", False),
|
||||
resources=[
|
||||
HttpResourceConfig(**res) for res in listener.get("resources", [])
|
||||
],
|
||||
additional_resources=listener.get("additional_resources", {}),
|
||||
tag=listener.get("tag"),
|
||||
)
|
||||
|
||||
return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
|
||||
|
||||
|
||||
NO_MORE_WEB_CLIENT_WARNING = """
|
||||
Synapse no longer includes a web client. To enable a web client, configure
|
||||
web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
|
||||
|
@ -1088,40 +1166,27 @@ configuration.
|
|||
"""
|
||||
|
||||
|
||||
def _warn_if_webclient_configured(listeners):
|
||||
def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
|
||||
for listener in listeners:
|
||||
for res in listener.get("resources", []):
|
||||
for name in res.get("names", []):
|
||||
if not listener.http_options:
|
||||
continue
|
||||
for res in listener.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "webclient":
|
||||
logger.warning(NO_MORE_WEB_CLIENT_WARNING)
|
||||
return
|
||||
|
||||
|
||||
KNOWN_RESOURCES = (
|
||||
"client",
|
||||
"consent",
|
||||
"federation",
|
||||
"keys",
|
||||
"media",
|
||||
"metrics",
|
||||
"openid",
|
||||
"replication",
|
||||
"static",
|
||||
"webclient",
|
||||
)
|
||||
|
||||
|
||||
def _check_resource_config(listeners):
|
||||
def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None:
|
||||
resource_names = {
|
||||
res_name
|
||||
for listener in listeners
|
||||
for res in listener.get("resources", [])
|
||||
for res_name in res.get("names", [])
|
||||
if listener.http_options
|
||||
for res in listener.http_options.resources
|
||||
for res_name in res.names
|
||||
}
|
||||
|
||||
for resource in resource_names:
|
||||
if resource not in KNOWN_RESOURCES:
|
||||
raise ConfigError("Unknown listener resource '%s'" % (resource,))
|
||||
if resource == "consent":
|
||||
try:
|
||||
check_requirements("resources.consent")
|
||||
|
|
|
@ -20,8 +20,6 @@ from datetime import datetime
|
|||
from hashlib import sha256
|
||||
from typing import List
|
||||
|
||||
import six
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from OpenSSL import SSL, crypto
|
||||
|
@ -59,7 +57,7 @@ class TlsConfig(Config):
|
|||
logger.warning(ACME_SUPPORT_ENABLED_WARN)
|
||||
|
||||
# hyperlink complains on py2 if this is not a Unicode
|
||||
self.acme_url = six.text_type(
|
||||
self.acme_url = str(
|
||||
acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory")
|
||||
)
|
||||
self.acme_port = acme_config.get("port", 80)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import attr
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
from .server import ListenerConfig, parse_listener_def
|
||||
|
||||
|
||||
@attr.s
|
||||
|
@ -52,7 +53,9 @@ class WorkerConfig(Config):
|
|||
if self.worker_app == "synapse.app.homeserver":
|
||||
self.worker_app = None
|
||||
|
||||
self.worker_listeners = config.get("worker_listeners", [])
|
||||
self.worker_listeners = [
|
||||
parse_listener_def(x) for x in config.get("worker_listeners", [])
|
||||
]
|
||||
self.worker_daemonize = config.get("worker_daemonize")
|
||||
self.worker_pid_file = config.get("worker_pid_file")
|
||||
self.worker_log_config = config.get("worker_log_config")
|
||||
|
@ -75,24 +78,11 @@ class WorkerConfig(Config):
|
|||
manhole = config.get("worker_manhole")
|
||||
if manhole:
|
||||
self.worker_listeners.append(
|
||||
{
|
||||
"port": manhole,
|
||||
"bind_addresses": ["127.0.0.1"],
|
||||
"type": "manhole",
|
||||
"tls": False,
|
||||
}
|
||||
ListenerConfig(
|
||||
port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
|
||||
)
|
||||
)
|
||||
|
||||
if self.worker_listeners:
|
||||
for listener in self.worker_listeners:
|
||||
bind_address = listener.pop("bind_address", None)
|
||||
bind_addresses = listener.setdefault("bind_addresses", [])
|
||||
|
||||
if bind_address:
|
||||
bind_addresses.append(bind_address)
|
||||
elif not bind_addresses:
|
||||
bind_addresses.append("")
|
||||
|
||||
# A map from instance name to host/port of their HTTP replication endpoint.
|
||||
instance_map = config.get("instance_map") or {}
|
||||
self.instance_map = {
|
||||
|
|
|
@ -15,11 +15,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
from collections import defaultdict
|
||||
|
||||
import six
|
||||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
from signedjson.key import (
|
||||
decode_verify_key_bytes,
|
||||
|
@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
for response in query_response["server_keys"]:
|
||||
# do this first, so that we can give useful errors thereafter
|
||||
server_name = response.get("server_name")
|
||||
if not isinstance(server_name, six.string_types):
|
||||
if not isinstance(server_name, str):
|
||||
raise KeyLookupError(
|
||||
"Malformed response from key notary server %s: invalid server_name"
|
||||
% (perspective_name,)
|
||||
|
|
|
@ -20,8 +20,6 @@ import os
|
|||
from distutils.util import strtobool
|
||||
from typing import Dict, Optional, Type
|
||||
|
||||
import six
|
||||
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
|
||||
|
@ -290,7 +288,7 @@ class EventBase(metaclass=abc.ABCMeta):
|
|||
return list(self._dict.items())
|
||||
|
||||
def keys(self):
|
||||
return six.iterkeys(self._dict)
|
||||
return self._dict.keys()
|
||||
|
||||
def prev_event_ids(self):
|
||||
"""Returns the list of prev event IDs. The order matches the order
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# limitations under the License.
|
||||
from typing import Optional, Union
|
||||
|
||||
from six import iteritems
|
||||
|
||||
import attr
|
||||
from frozendict import frozendict
|
||||
|
||||
|
@ -341,7 +339,7 @@ def _encode_state_dict(state_dict):
|
|||
if state_dict is None:
|
||||
return None
|
||||
|
||||
return [(etype, state_key, v) for (etype, state_key), v in iteritems(state_dict)]
|
||||
return [(etype, state_key, v) for (etype, state_key), v in state_dict.items()]
|
||||
|
||||
|
||||
def _decode_state_dict(input):
|
||||
|
|
|
@ -16,8 +16,6 @@ import collections
|
|||
import re
|
||||
from typing import Any, Mapping, Union
|
||||
|
||||
from six import string_types
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -318,7 +316,7 @@ def serialize_event(
|
|||
|
||||
if only_event_fields:
|
||||
if not isinstance(only_event_fields, list) or not all(
|
||||
isinstance(f, string_types) for f in only_event_fields
|
||||
isinstance(f, str) for f in only_event_fields
|
||||
):
|
||||
raise TypeError("only_event_fields must be a list of strings")
|
||||
d = only_fields(d, only_event_fields)
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six import integer_types, string_types
|
||||
|
||||
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.room_versions import EventFormatVersions
|
||||
|
@ -53,7 +51,7 @@ class EventValidator(object):
|
|||
event_strings = ["origin"]
|
||||
|
||||
for s in event_strings:
|
||||
if not isinstance(getattr(event, s), string_types):
|
||||
if not isinstance(getattr(event, s), str):
|
||||
raise SynapseError(400, "'%s' not a string type" % (s,))
|
||||
|
||||
# Depending on the room version, ensure the data is spec compliant JSON.
|
||||
|
@ -90,7 +88,7 @@ class EventValidator(object):
|
|||
max_lifetime = event.content.get("max_lifetime")
|
||||
|
||||
if min_lifetime is not None:
|
||||
if not isinstance(min_lifetime, integer_types):
|
||||
if not isinstance(min_lifetime, int):
|
||||
raise SynapseError(
|
||||
code=400,
|
||||
msg="'min_lifetime' must be an integer",
|
||||
|
@ -124,7 +122,7 @@ class EventValidator(object):
|
|||
)
|
||||
|
||||
if max_lifetime is not None:
|
||||
if not isinstance(max_lifetime, integer_types):
|
||||
if not isinstance(max_lifetime, int):
|
||||
raise SynapseError(
|
||||
code=400,
|
||||
msg="'max_lifetime' must be an integer",
|
||||
|
@ -183,7 +181,7 @@ class EventValidator(object):
|
|||
strings.append("state_key")
|
||||
|
||||
for s in strings:
|
||||
if not isinstance(getattr(event, s), string_types):
|
||||
if not isinstance(getattr(event, s), str):
|
||||
raise SynapseError(400, "Not '%s' a string type" % (s,))
|
||||
|
||||
RoomID.from_string(event.room_id)
|
||||
|
@ -223,7 +221,7 @@ class EventValidator(object):
|
|||
for s in keys:
|
||||
if s not in d:
|
||||
raise SynapseError(400, "'%s' not in content" % (s,))
|
||||
if not isinstance(d[s], string_types):
|
||||
if not isinstance(d[s], str):
|
||||
raise SynapseError(400, "'%s' not a string type" % (s,))
|
||||
|
||||
def _ensure_state_event(self, event):
|
||||
|
|
|
@ -17,8 +17,6 @@ import logging
|
|||
from collections import namedtuple
|
||||
from typing import Iterable, List
|
||||
|
||||
import six
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred, DeferredList
|
||||
from twisted.python.failure import Failure
|
||||
|
@ -93,8 +91,8 @@ class FederationBase(object):
|
|||
# *actual* redacted copy to be on the safe side.)
|
||||
redacted_event = prune_event(pdu)
|
||||
if set(redacted_event.keys()) == set(pdu.keys()) and set(
|
||||
six.iterkeys(redacted_event.content)
|
||||
) == set(six.iterkeys(pdu.content)):
|
||||
redacted_event.content.keys()
|
||||
) == set(pdu.content.keys()):
|
||||
logger.info(
|
||||
"Event %s seems to have been redacted; using our redacted "
|
||||
"copy",
|
||||
|
@ -294,7 +292,7 @@ def event_from_pdu_json(
|
|||
assert_params_in_dict(pdu_json, ("type", "depth"))
|
||||
|
||||
depth = pdu_json["depth"]
|
||||
if not isinstance(depth, six.integer_types):
|
||||
if not isinstance(depth, int):
|
||||
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
|
||||
|
||||
if depth < 0:
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
import logging
|
||||
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
|
||||
|
||||
import six
|
||||
from six import iteritems
|
||||
|
||||
from canonicaljson import json
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
@ -534,9 +531,9 @@ class FederationServer(FederationBase):
|
|||
",".join(
|
||||
(
|
||||
"%s for %s:%s" % (key_id, user_id, device_id)
|
||||
for user_id, user_keys in iteritems(json_result)
|
||||
for device_id, device_keys in iteritems(user_keys)
|
||||
for key_id, _ in iteritems(device_keys)
|
||||
for user_id, user_keys in json_result.items()
|
||||
for device_id, device_keys in user_keys.items()
|
||||
for key_id, _ in device_keys.items()
|
||||
)
|
||||
),
|
||||
)
|
||||
|
@ -752,7 +749,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
|
|||
|
||||
|
||||
def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
|
||||
if not isinstance(acl_entry, six.string_types):
|
||||
if not isinstance(acl_entry, str):
|
||||
logger.warning(
|
||||
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
|
||||
)
|
||||
|
|
|
@ -33,8 +33,6 @@ import logging
|
|||
from collections import namedtuple
|
||||
from typing import Dict, List, Tuple, Type
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object):
|
|||
# stream position.
|
||||
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
|
||||
|
||||
for ((destination, edu_key), pos) in iteritems(keyed_edus):
|
||||
for ((destination, edu_key), pos) in keyed_edus.items():
|
||||
rows.append(
|
||||
(
|
||||
pos,
|
||||
|
@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows):
|
|||
states=[state], destinations=destinations
|
||||
)
|
||||
|
||||
for destination, edu_map in iteritems(buff.keyed_edus):
|
||||
for destination, edu_map in buff.keyed_edus.items():
|
||||
for key, edu in edu_map.items():
|
||||
transaction_queue.send_edu(edu, key)
|
||||
|
||||
for destination, edu_list in iteritems(buff.edus):
|
||||
for destination, edu_list in buff.edus.items():
|
||||
for edu in edu_list:
|
||||
transaction_queue.send_edu(edu, None)
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
import logging
|
||||
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from six import itervalues
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -233,7 +231,7 @@ class FederationSender(object):
|
|||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(handle_room_events, evs)
|
||||
for evs in itervalues(events_by_room)
|
||||
for evs in events_by_room.values()
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
|
|
|
@ -15,10 +15,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import string_types
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
|
@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
|
|||
for keyname in ("name", "avatar_url", "short_description", "long_description"):
|
||||
if keyname in content:
|
||||
value = content[keyname]
|
||||
if not isinstance(value, string_types):
|
||||
if not isinstance(value, str):
|
||||
raise SynapseError(400, "%r value is not a string" % (keyname,))
|
||||
profile[keyname] = value
|
||||
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import itervalues
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -125,7 +123,7 @@ class ApplicationServicesHandler(object):
|
|||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(handle_room_events, evs)
|
||||
for evs in itervalues(events_by_room)
|
||||
for evs in events_by_room.values()
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
|
|
|
@ -297,7 +297,7 @@ class AuthHandler(BaseHandler):
|
|||
|
||||
# Convert the URI and method to strings.
|
||||
uri = request.uri.decode("utf-8")
|
||||
method = request.uri.decode("utf-8")
|
||||
method = request.method.decode("utf-8")
|
||||
|
||||
# If there's no session ID, create a new session.
|
||||
if not sid:
|
||||
|
|
|
@ -14,11 +14,10 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from twisted.web.client import PartialDownloadError
|
||||
|
||||
from synapse.api.errors import Codes, LoginError
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
import logging
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from six import iteritems, itervalues
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api import errors
|
||||
|
@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
# The user may have left the room
|
||||
# TODO: Check if they actually did or if we were just invited.
|
||||
if room_id not in room_ids:
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
for key, event_id in current_state_ids.items():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
log_kv(
|
||||
{"event": "encountered empty previous state", "room_id": room_id}
|
||||
)
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
for key, event_id in current_state_ids.items():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
|
||||
# Check if we've joined the room? If so we just blindly add all the users to
|
||||
# the "possibly changed" users.
|
||||
for state_dict in itervalues(prev_state_ids):
|
||||
for state_dict in prev_state_ids.values():
|
||||
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
||||
if not member_event or member_event != current_member_id:
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
for key, event_id in current_state_ids.items():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
# If there has been any change in membership, include them in the
|
||||
# possibly changed list. We'll check if they are joined below,
|
||||
# and we're not toooo worried about spuriously adding users.
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
for key, event_id in current_state_ids.items():
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
|
||||
# check if this member has changed since any of the extremities
|
||||
# at the stream_ordering, and add them to the list if so.
|
||||
for state_dict in itervalues(prev_state_ids):
|
||||
for state_dict in prev_state_ids.values():
|
||||
prev_event_id = state_dict.get(key, None)
|
||||
if not prev_event_id or prev_event_id != event_id:
|
||||
if state_key != user_id:
|
||||
|
@ -693,6 +691,7 @@ class DeviceListUpdater(object):
|
|||
|
||||
return False
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def _maybe_retry_device_resync(self):
|
||||
"""Retry to resync device lists that are out of sync, except if another retry is
|
||||
|
|
|
@ -18,8 +18,6 @@ from typing import Any, Dict
|
|||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import (
|
||||
|
@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
|
|||
|
||||
self._device_list_updater = hs.get_device_handler().device_list_updater
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_direct_to_device_edu(self, origin, content):
|
||||
async def on_direct_to_device_edu(self, origin, content):
|
||||
local_messages = {}
|
||||
sender_user_id = content["sender"]
|
||||
if origin != get_domain_from_id(sender_user_id):
|
||||
|
@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
|
|||
}
|
||||
local_messages[user_id] = messages_by_device
|
||||
|
||||
yield self._check_for_unknown_devices(
|
||||
await self._check_for_unknown_devices(
|
||||
message_type, sender_user_id, by_device
|
||||
)
|
||||
|
||||
stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
|
||||
stream_id = await self.store.add_messages_from_remote_to_device_inbox(
|
||||
origin, message_id, local_messages
|
||||
)
|
||||
|
||||
|
@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
|
|||
"to_device_key", stream_id, users=local_messages.keys()
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_for_unknown_devices(
|
||||
async def _check_for_unknown_devices(
|
||||
self,
|
||||
message_type: str,
|
||||
sender_user_id: str,
|
||||
by_device: Dict[str, Dict[str, Any]],
|
||||
):
|
||||
"""Checks inbound device messages for unkown remote devices, and if
|
||||
"""Checks inbound device messages for unknown remote devices, and if
|
||||
found marks the remote cache for the user as stale.
|
||||
"""
|
||||
|
||||
|
@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
|
|||
requesting_device_ids.add(device_id)
|
||||
|
||||
# Check if we are tracking the devices of the remote user.
|
||||
room_ids = yield self.store.get_rooms_for_user(sender_user_id)
|
||||
room_ids = await self.store.get_rooms_for_user(sender_user_id)
|
||||
if not room_ids:
|
||||
logger.info(
|
||||
"Received device message from remote device we don't"
|
||||
|
@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
|
|||
|
||||
# If we are tracking check that we know about the sending
|
||||
# devices.
|
||||
cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
|
||||
cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
|
||||
|
||||
unknown_devices = requesting_device_ids - set(cached_devices)
|
||||
if unknown_devices:
|
||||
|
@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
|
|||
sender_user_id,
|
||||
unknown_devices,
|
||||
)
|
||||
yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
|
||||
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
|
||||
|
||||
# Immediately attempt a resync in the background
|
||||
run_in_background(
|
||||
self._device_list_updater.user_device_resync, sender_user_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_device_message(self, sender_user_id, message_type, messages):
|
||||
async def send_device_message(self, sender_user_id, message_type, messages):
|
||||
set_tag("number_of_messages", len(messages))
|
||||
set_tag("sender", sender_user_id)
|
||||
local_messages = {}
|
||||
|
@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
|
|||
}
|
||||
|
||||
log_kv({"local_messages": local_messages})
|
||||
stream_id = yield self.store.add_messages_to_device_inbox(
|
||||
stream_id = await self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
)
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import iteritems
|
||||
|
||||
import attr
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
|
@ -135,7 +133,7 @@ class E2eKeysHandler(object):
|
|||
remote_queries_not_in_cache = {}
|
||||
if remote_queries:
|
||||
query_list = []
|
||||
for user_id, device_ids in iteritems(remote_queries):
|
||||
for user_id, device_ids in remote_queries.items():
|
||||
if device_ids:
|
||||
query_list.extend((user_id, device_id) for device_id in device_ids)
|
||||
else:
|
||||
|
@ -145,9 +143,9 @@ class E2eKeysHandler(object):
|
|||
user_ids_not_in_cache,
|
||||
remote_results,
|
||||
) = yield self.store.get_user_devices_from_cache(query_list)
|
||||
for user_id, devices in iteritems(remote_results):
|
||||
for user_id, devices in remote_results.items():
|
||||
user_devices = results.setdefault(user_id, {})
|
||||
for device_id, device in iteritems(devices):
|
||||
for device_id, device in devices.items():
|
||||
keys = device.get("keys", None)
|
||||
device_display_name = device.get("device_display_name", None)
|
||||
if keys:
|
||||
|
@ -446,9 +444,9 @@ class E2eKeysHandler(object):
|
|||
",".join(
|
||||
(
|
||||
"%s for %s:%s" % (key_id, user_id, device_id)
|
||||
for user_id, user_keys in iteritems(json_result)
|
||||
for device_id, device_keys in iteritems(user_keys)
|
||||
for key_id, _ in iteritems(device_keys)
|
||||
for user_id, user_keys in json_result.items()
|
||||
for device_id, device_keys in user_keys.items()
|
||||
for key_id, _ in device_keys.items()
|
||||
)
|
||||
),
|
||||
)
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import (
|
||||
|
@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object):
|
|||
)
|
||||
to_insert = [] # batch the inserts together
|
||||
changed = False # if anything has changed, we need to update the etag
|
||||
for room_id, room in iteritems(room_keys["rooms"]):
|
||||
for session_id, room_key in iteritems(room["sessions"]):
|
||||
for room_id, room in room_keys["rooms"].items():
|
||||
for session_id, room_key in room["sessions"].items():
|
||||
if not isinstance(room_key["is_verified"], bool):
|
||||
msg = (
|
||||
"is_verified must be a boolean in keys for session %s in"
|
||||
|
@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object):
|
|||
raise
|
||||
|
||||
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
|
||||
res["etag"] = str(res["etag"])
|
||||
return res
|
||||
|
||||
@trace
|
||||
|
|
|
@ -19,12 +19,9 @@
|
|||
|
||||
import itertools
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
|
||||
|
||||
import six
|
||||
from six import iteritems, itervalues
|
||||
from six.moves import http_client, zip
|
||||
|
||||
import attr
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
|
@ -33,7 +30,12 @@ from unpaddedbase64 import decode_base64
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, Membership, RejectedReason
|
||||
from synapse.api.constants import (
|
||||
EventTypes,
|
||||
Membership,
|
||||
RejectedReason,
|
||||
RoomEncryptionAlgorithms,
|
||||
)
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
CodeMessageException,
|
||||
|
@ -393,7 +395,7 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
event_map.update(evs)
|
||||
|
||||
state = [event_map[e] for e in six.itervalues(state_map)]
|
||||
state = [event_map[e] for e in state_map.values()]
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[%s %s] Error attempting to resolve state at missing "
|
||||
|
@ -742,7 +744,10 @@ class FederationHandler(BaseHandler):
|
|||
if device:
|
||||
keys = device.get("keys", {}).get("keys", {})
|
||||
|
||||
if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
|
||||
if (
|
||||
event.content.get("algorithm")
|
||||
== RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
|
||||
):
|
||||
# For this algorithm we expect a curve25519 key.
|
||||
key_name = "curve25519:%s" % (device_id,)
|
||||
current_keys = [keys.get(key_name)]
|
||||
|
@ -1001,7 +1006,7 @@ class FederationHandler(BaseHandler):
|
|||
"""
|
||||
joined_users = [
|
||||
(state_key, int(event.depth))
|
||||
for (e_type, state_key), event in iteritems(state)
|
||||
for (e_type, state_key), event in state.items()
|
||||
if e_type == EventTypes.Member and event.membership == Membership.JOIN
|
||||
]
|
||||
|
||||
|
@ -1091,16 +1096,16 @@ class FederationHandler(BaseHandler):
|
|||
states = dict(zip(event_ids, [s.state for s in states]))
|
||||
|
||||
state_map = await self.store.get_events(
|
||||
[e_id for ids in itervalues(states) for e_id in itervalues(ids)],
|
||||
[e_id for ids in states.values() for e_id in ids.values()],
|
||||
get_prev_content=False,
|
||||
)
|
||||
states = {
|
||||
key: {
|
||||
k: state_map[e_id]
|
||||
for k, e_id in iteritems(state_dict)
|
||||
for k, e_id in state_dict.items()
|
||||
if e_id in state_map
|
||||
}
|
||||
for key, state_dict in iteritems(states)
|
||||
for key, state_dict in states.items()
|
||||
}
|
||||
|
||||
for e_id, _ in sorted_extremeties_tuple:
|
||||
|
@ -1188,7 +1193,7 @@ class FederationHandler(BaseHandler):
|
|||
ev.event_id,
|
||||
len(ev.prev_event_ids()),
|
||||
)
|
||||
raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
|
||||
|
||||
if len(ev.auth_event_ids()) > 10:
|
||||
logger.warning(
|
||||
|
@ -1196,7 +1201,7 @@ class FederationHandler(BaseHandler):
|
|||
ev.event_id,
|
||||
len(ev.auth_event_ids()),
|
||||
)
|
||||
raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||
|
||||
async def send_invite(self, target_host, event):
|
||||
""" Sends the invite to the remote server for signing.
|
||||
|
@ -1539,7 +1544,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
# block any attempts to invite the server notices mxid
|
||||
if event.state_key == self._server_notices_mxid:
|
||||
raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
|
||||
raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
|
||||
|
||||
# keep a record of the room version, if we don't yet know it.
|
||||
# (this may get overwritten if we later get a different room version in a
|
||||
|
@ -1725,7 +1730,7 @@ class FederationHandler(BaseHandler):
|
|||
state_groups = await self.state_store.get_state_groups(room_id, [event_id])
|
||||
|
||||
if state_groups:
|
||||
_, state = list(iteritems(state_groups)).pop()
|
||||
_, state = list(state_groups.items()).pop()
|
||||
results = {(e.type, e.state_key): e for e in state}
|
||||
|
||||
if event.is_state():
|
||||
|
@ -2088,7 +2093,7 @@ class FederationHandler(BaseHandler):
|
|||
room_version, state_sets, event
|
||||
)
|
||||
current_state_ids = {
|
||||
k: e.event_id for k, e in iteritems(current_state_ids)
|
||||
k: e.event_id for k, e in current_state_ids.items()
|
||||
}
|
||||
else:
|
||||
current_state_ids = await self.state_handler.get_current_state_ids(
|
||||
|
@ -2104,7 +2109,7 @@ class FederationHandler(BaseHandler):
|
|||
# Now check if event pass auth against said current state
|
||||
auth_types = auth_types_for_event(event)
|
||||
current_state_ids = [
|
||||
e for k, e in iteritems(current_state_ids) if k in auth_types
|
||||
e for k, e in current_state_ids.items() if k in auth_types
|
||||
]
|
||||
|
||||
current_auth_events = await self.store.get_events(current_state_ids)
|
||||
|
@ -2420,7 +2425,7 @@ class FederationHandler(BaseHandler):
|
|||
else:
|
||||
event_key = None
|
||||
state_updates = {
|
||||
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
|
||||
k: a.event_id for k, a in auth_events.items() if k != event_key
|
||||
}
|
||||
|
||||
current_state_ids = await context.get_current_state_ids()
|
||||
|
@ -2431,7 +2436,7 @@ class FederationHandler(BaseHandler):
|
|||
prev_state_ids = await context.get_prev_state_ids()
|
||||
prev_state_ids = dict(prev_state_ids)
|
||||
|
||||
prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
|
||||
prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
|
||||
|
||||
# create a new state group as a delta from the existing one.
|
||||
prev_group = context.state_group
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
|
||||
from synapse.types import get_domain_from_id
|
||||
|
||||
|
@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object):
|
|||
|
||||
results = {}
|
||||
failed_results = []
|
||||
for destination, dest_user_ids in iteritems(destinations):
|
||||
for destination, dest_user_ids in destinations.items():
|
||||
try:
|
||||
r = await self.transport_client.bulk_get_publicised_groups(
|
||||
destination, list(dest_user_ids)
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
import logging
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from six import iteritems, itervalues, string_types
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -246,7 +244,7 @@ class MessageHandler(object):
|
|||
"avatar_url": profile.avatar_url,
|
||||
"display_name": profile.display_name,
|
||||
}
|
||||
for user_id, profile in iteritems(users_with_profile)
|
||||
for user_id, profile in users_with_profile.items()
|
||||
}
|
||||
|
||||
def maybe_schedule_expiry(self, event):
|
||||
|
@ -717,7 +715,7 @@ class EventCreationHandler(object):
|
|||
|
||||
spam_error = self.spam_checker.check_event_for_spam(event)
|
||||
if spam_error:
|
||||
if not isinstance(spam_error, string_types):
|
||||
if not isinstance(spam_error, str):
|
||||
spam_error = "Spam is not permitted here"
|
||||
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
|
||||
|
||||
|
@ -990,7 +988,7 @@ class EventCreationHandler(object):
|
|||
|
||||
state_to_include_ids = [
|
||||
e_id
|
||||
for k, e_id in iteritems(current_state_ids)
|
||||
for k, e_id in current_state_ids.items()
|
||||
if k[0] in self.room_invite_state_types
|
||||
or k == (EventTypes.Member, event.sender)
|
||||
]
|
||||
|
@ -1004,7 +1002,7 @@ class EventCreationHandler(object):
|
|||
"content": e.content,
|
||||
"sender": e.sender,
|
||||
}
|
||||
for e in itervalues(state_to_include)
|
||||
for e in state_to_include.values()
|
||||
]
|
||||
|
||||
invitee = UserID.from_string(event.state_key)
|
||||
|
|
|
@ -15,9 +15,6 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
|
@ -99,8 +96,7 @@ class PaginationHandler(object):
|
|||
job["longest_max_lifetime"],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def purge_history_for_rooms_in_range(self, min_ms, max_ms):
|
||||
async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
|
||||
"""Purge outdated events from rooms within the given retention range.
|
||||
|
||||
If a default retention policy is defined in the server's configuration and its
|
||||
|
@ -139,13 +135,13 @@ class PaginationHandler(object):
|
|||
include_null,
|
||||
)
|
||||
|
||||
rooms = yield self.store.get_rooms_for_retention_period_in_range(
|
||||
rooms = await self.store.get_rooms_for_retention_period_in_range(
|
||||
min_ms, max_ms, include_null
|
||||
)
|
||||
|
||||
logger.debug("[purge] Rooms to purge: %s", rooms)
|
||||
|
||||
for room_id, retention_policy in iteritems(rooms):
|
||||
for room_id, retention_policy in rooms.items():
|
||||
logger.info("[purge] Attempting to purge messages in room %s", room_id)
|
||||
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
|
@ -167,9 +163,9 @@ class PaginationHandler(object):
|
|||
# Figure out what token we should start purging at.
|
||||
ts = self.clock.time_msec() - max_lifetime
|
||||
|
||||
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
|
||||
stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
|
||||
|
||||
r = yield self.store.get_room_event_before_stream_ordering(
|
||||
r = await self.store.get_room_event_before_stream_ordering(
|
||||
room_id, stream_ordering,
|
||||
)
|
||||
if not r:
|
||||
|
@ -229,8 +225,7 @@ class PaginationHandler(object):
|
|||
)
|
||||
return purge_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _purge_history(self, purge_id, room_id, token, delete_local_events):
|
||||
async def _purge_history(self, purge_id, room_id, token, delete_local_events):
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
|
@ -239,14 +234,11 @@ class PaginationHandler(object):
|
|||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
with (yield self.pagination_lock.write(room_id)):
|
||||
yield self.storage.purge_events.purge_history(
|
||||
with await self.pagination_lock.write(room_id):
|
||||
await self.storage.purge_events.purge_history(
|
||||
room_id, token, delete_local_events
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
|
@ -284,9 +276,7 @@ class PaginationHandler(object):
|
|||
await self.store.get_room_version_id(room_id)
|
||||
|
||||
# first check that we have no users in this room
|
||||
joined = await defer.maybeDeferred(
|
||||
self.store.is_host_joined, room_id, self._server_name
|
||||
)
|
||||
joined = await self.store.is_host_joined(room_id, self._server_name)
|
||||
|
||||
if joined:
|
||||
raise SynapseError(400, "Users are still joined to this room")
|
||||
|
|
|
@ -25,9 +25,7 @@ The methods that define policy are:
|
|||
import abc
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from typing import Dict, Iterable, List, Set
|
||||
|
||||
from six import iteritems, itervalues
|
||||
from typing import Dict, Iterable, List, Set, Tuple
|
||||
|
||||
from prometheus_client import Counter
|
||||
from typing_extensions import ContextManager
|
||||
|
@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC):
|
|||
for user_id in user_ids
|
||||
}
|
||||
|
||||
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||
missing = [user_id for user_id, state in states.items() if not state]
|
||||
if missing:
|
||||
# There are things not in our in memory cache. Lets pull them out of
|
||||
# the database.
|
||||
res = await self.store.get_presence_for_users(missing)
|
||||
states.update(res)
|
||||
|
||||
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||
missing = [user_id for user_id, state in states.items() if not state]
|
||||
if missing:
|
||||
new = {
|
||||
user_id: UserPresenceState.default(user_id) for user_id in missing
|
||||
|
@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler):
|
|||
await self._update_states(
|
||||
[
|
||||
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
|
||||
for prev_state in itervalues(prev_states)
|
||||
for prev_state in prev_states.values()
|
||||
]
|
||||
)
|
||||
self.external_process_last_updated_ms.pop(process_id, None)
|
||||
|
@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
|
|||
|
||||
return False
|
||||
|
||||
async def get_all_presence_updates(self, last_id, current_id, limit):
|
||||
async def get_all_presence_updates(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, list]], int, bool]:
|
||||
"""
|
||||
Gets a list of presence update rows from between the given stream ids.
|
||||
Each row has:
|
||||
|
@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
|
|||
- last_user_sync_ts(int)
|
||||
- status_msg(int)
|
||||
- currently_active(int)
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
here since there is only ever one writer.
|
||||
last_id: The token to fetch updates from. Exclusive.
|
||||
current_id: The token to fetch updates up to. Inclusive.
|
||||
limit: The requested limit for the number of rows to return. The
|
||||
function may return more or fewer rows.
|
||||
|
||||
Returns:
|
||||
A tuple consisting of: the updates, a token to use to fetch
|
||||
subsequent updates, and whether we returned fewer rows than exists
|
||||
between the requested tokens due to the limit.
|
||||
|
||||
The token returned can be used in a subsequent call to this
|
||||
function to get further updatees.
|
||||
|
||||
The updates are a list of 2-tuples of stream ID and the row data
|
||||
"""
|
||||
|
||||
# TODO(markjh): replicate the unpersisted changes.
|
||||
# This could use the in-memory stores for recent changes.
|
||||
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
|
||||
rows = await self.store.get_all_presence_updates(
|
||||
instance_name, last_id, current_id, limit
|
||||
)
|
||||
return rows
|
||||
|
||||
def notify_new_event(self):
|
||||
|
@ -1087,7 +1108,7 @@ class PresenceEventSource(object):
|
|||
return (list(updates.values()), max_token)
|
||||
else:
|
||||
return (
|
||||
[s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
|
||||
[s for s in updates.values() if s.state != PresenceState.OFFLINE],
|
||||
max_token,
|
||||
)
|
||||
|
||||
|
@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler):
|
|||
# hosts in those rooms.
|
||||
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
|
||||
|
||||
for room_id, states in iteritems(room_ids_to_states):
|
||||
for room_id, states in room_ids_to_states.items():
|
||||
hosts = yield state_handler.get_current_hosts_in_room(room_id)
|
||||
hosts_and_states.append((hosts, states))
|
||||
|
||||
for user_id, states in iteritems(users_to_states):
|
||||
for user_id, states in users_to_states.items():
|
||||
host = get_domain_from_id(user_id)
|
||||
hosts_and_states.append(([host], states))
|
||||
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import raise_from
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import (
|
||||
|
@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
|
|||
)
|
||||
return result
|
||||
except RequestSendFailed as e:
|
||||
raise_from(SynapseError(502, "Failed to fetch profile"), e)
|
||||
raise SynapseError(502, "Failed to fetch profile") from e
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
|
@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
|
|||
ignore_backoff=True,
|
||||
)
|
||||
except RequestSendFailed as e:
|
||||
raise_from(SynapseError(502, "Failed to fetch profile"), e)
|
||||
raise SynapseError(502, "Failed to fetch profile") from e
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
|
@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
|
|||
ignore_backoff=True,
|
||||
)
|
||||
except RequestSendFailed as e:
|
||||
raise_from(SynapseError(502, "Failed to fetch profile"), e)
|
||||
raise SynapseError(502, "Failed to fetch profile") from e
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
|
|
|
@ -24,9 +24,12 @@ import string
|
|||
from collections import OrderedDict
|
||||
from typing import Tuple
|
||||
|
||||
from six import iteritems, string_types
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
|
||||
from synapse.api.constants import (
|
||||
EventTypes,
|
||||
JoinRules,
|
||||
RoomCreationPreset,
|
||||
RoomEncryptionAlgorithms,
|
||||
)
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.events.utils import copy_power_levels_contents
|
||||
|
@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
|
|||
|
||||
|
||||
class RoomCreationHandler(BaseHandler):
|
||||
|
||||
PRESETS_DICT = {
|
||||
RoomCreationPreset.PRIVATE_CHAT: {
|
||||
"join_rules": JoinRules.INVITE,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": False,
|
||||
"guest_can_join": True,
|
||||
"power_level_content_override": {"invite": 0},
|
||||
},
|
||||
RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
|
||||
"join_rules": JoinRules.INVITE,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": True,
|
||||
"guest_can_join": True,
|
||||
"power_level_content_override": {"invite": 0},
|
||||
},
|
||||
RoomCreationPreset.PUBLIC_CHAT: {
|
||||
"join_rules": JoinRules.PUBLIC,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": False,
|
||||
"guest_can_join": False,
|
||||
"power_level_content_override": {},
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, hs):
|
||||
super(RoomCreationHandler, self).__init__(hs)
|
||||
|
||||
|
@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler):
|
|||
self.room_member_handler = hs.get_room_member_handler()
|
||||
self.config = hs.config
|
||||
|
||||
# Room state based off defined presets
|
||||
self._presets_dict = {
|
||||
RoomCreationPreset.PRIVATE_CHAT: {
|
||||
"join_rules": JoinRules.INVITE,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": False,
|
||||
"guest_can_join": True,
|
||||
"power_level_content_override": {"invite": 0},
|
||||
},
|
||||
RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
|
||||
"join_rules": JoinRules.INVITE,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": True,
|
||||
"guest_can_join": True,
|
||||
"power_level_content_override": {"invite": 0},
|
||||
},
|
||||
RoomCreationPreset.PUBLIC_CHAT: {
|
||||
"join_rules": JoinRules.PUBLIC,
|
||||
"history_visibility": "shared",
|
||||
"original_invitees_have_ops": False,
|
||||
"guest_can_join": False,
|
||||
"power_level_content_override": {},
|
||||
},
|
||||
}
|
||||
|
||||
# Modify presets to selectively enable encryption by default per homeserver config
|
||||
for preset_name, preset_config in self._presets_dict.items():
|
||||
encrypted = (
|
||||
preset_name
|
||||
in self.config.encryption_enabled_by_default_for_room_presets
|
||||
)
|
||||
preset_config["encrypted"] = encrypted
|
||||
|
||||
self._replication = hs.get_replication_data_handler()
|
||||
|
||||
# linearizer to stop two upgrades happening at once
|
||||
|
@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
# map from event_id to BaseEvent
|
||||
old_room_state_events = await self.store.get_events(old_room_state_ids.values())
|
||||
|
||||
for k, old_event_id in iteritems(old_room_state_ids):
|
||||
for k, old_event_id in old_room_state_ids.items():
|
||||
old_event = old_room_state_events.get(old_event_id)
|
||||
if old_event:
|
||||
initial_state[k] = old_event.content
|
||||
|
@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
old_room_member_state_events = await self.store.get_events(
|
||||
old_room_member_state_ids.values()
|
||||
)
|
||||
for k, old_event in iteritems(old_room_member_state_events):
|
||||
for k, old_event in old_room_member_state_events.items():
|
||||
# Only transfer ban events
|
||||
if (
|
||||
"membership" in old_event.content
|
||||
|
@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
"room_version", self.config.default_room_version.identifier
|
||||
)
|
||||
|
||||
if not isinstance(room_version_id, string_types):
|
||||
if not isinstance(room_version_id, str):
|
||||
raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
|
||||
|
||||
room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
|
||||
|
@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler):
|
|||
)
|
||||
return last_stream_id
|
||||
|
||||
config = RoomCreationHandler.PRESETS_DICT[preset_config]
|
||||
config = self._presets_dict[preset_config]
|
||||
|
||||
creator_id = creator.user.to_string()
|
||||
|
||||
|
@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler):
|
|||
etype=etype, state_key=state_key, content=content
|
||||
)
|
||||
|
||||
if config["encrypted"]:
|
||||
last_sent_stream_id = await send(
|
||||
etype=EventTypes.RoomEncryption,
|
||||
state_key="",
|
||||
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
|
||||
)
|
||||
|
||||
return last_sent_stream_id
|
||||
|
||||
async def _generate_room_id(
|
||||
|
|
|
@ -17,8 +17,6 @@ import logging
|
|||
from collections import namedtuple
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from six import iteritems
|
||||
|
||||
import msgpack
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
|
@ -272,7 +270,7 @@ class RoomListHandler(BaseHandler):
|
|||
event_map = yield self.store.get_events(
|
||||
[
|
||||
event_id
|
||||
for key, event_id in iteritems(current_state_ids)
|
||||
for key, event_id in current_state_ids.items()
|
||||
if key[0]
|
||||
in (
|
||||
EventTypes.Create,
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
|
||||
import abc
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from six.moves import http_client
|
||||
|
||||
from synapse import types
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
|
@ -381,7 +380,7 @@ class RoomMemberHandler(object):
|
|||
if effective_membership_state == Membership.INVITE:
|
||||
# block any attempts to invite the server notices mxid
|
||||
if target.to_string() == self._server_notices_mxid:
|
||||
raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
|
||||
raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
|
||||
|
||||
block_invite = False
|
||||
|
||||
|
@ -464,7 +463,7 @@ class RoomMemberHandler(object):
|
|||
is_blocked = await self._is_server_notice_room(room_id)
|
||||
if is_blocked:
|
||||
raise SynapseError(
|
||||
http_client.FORBIDDEN,
|
||||
HTTPStatus.FORBIDDEN,
|
||||
"You cannot reject this invite",
|
||||
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
|
||||
)
|
||||
|
|
|
@ -18,8 +18,6 @@ import itertools
|
|||
import logging
|
||||
from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
|
||||
|
||||
from six import iteritems, itervalues
|
||||
|
||||
import attr
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
@ -393,7 +391,7 @@ class SyncHandler(object):
|
|||
# result returned by the event source is poor form (it might cache
|
||||
# the object)
|
||||
room_id = event["room_id"]
|
||||
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
|
||||
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
|
||||
receipt_key = since_token.receipt_key if since_token else "0"
|
||||
|
@ -411,7 +409,7 @@ class SyncHandler(object):
|
|||
for event in receipts:
|
||||
room_id = event["room_id"]
|
||||
# exclude room id, as above
|
||||
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
|
||||
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
|
||||
return now_token, ephemeral_by_room
|
||||
|
@ -457,7 +455,7 @@ class SyncHandler(object):
|
|||
current_state_ids_map = await self.state.get_current_state_ids(
|
||||
room_id
|
||||
)
|
||||
current_state_ids = frozenset(itervalues(current_state_ids_map))
|
||||
current_state_ids = frozenset(current_state_ids_map.values())
|
||||
|
||||
recents = await filter_events_for_client(
|
||||
self.storage,
|
||||
|
@ -512,7 +510,7 @@ class SyncHandler(object):
|
|||
current_state_ids_map = await self.state.get_current_state_ids(
|
||||
room_id
|
||||
)
|
||||
current_state_ids = frozenset(itervalues(current_state_ids_map))
|
||||
current_state_ids = frozenset(current_state_ids_map.values())
|
||||
|
||||
loaded_recents = await filter_events_for_client(
|
||||
self.storage,
|
||||
|
@ -912,7 +910,7 @@ class SyncHandler(object):
|
|||
logger.debug("filtering state from %r...", state_ids)
|
||||
state_ids = {
|
||||
t: event_id
|
||||
for t, event_id in iteritems(state_ids)
|
||||
for t, event_id in state_ids.items()
|
||||
if cache.get(t[1]) != event_id
|
||||
}
|
||||
logger.debug("...to %r", state_ids)
|
||||
|
@ -1433,7 +1431,7 @@ class SyncHandler(object):
|
|||
if since_token:
|
||||
for joined_sync in sync_result_builder.joined:
|
||||
it = itertools.chain(
|
||||
joined_sync.timeline.events, itervalues(joined_sync.state)
|
||||
joined_sync.timeline.events, joined_sync.state.values()
|
||||
)
|
||||
for event in it:
|
||||
if event.type == EventTypes.Member:
|
||||
|
@ -1508,7 +1506,7 @@ class SyncHandler(object):
|
|||
newly_left_rooms = []
|
||||
room_entries = []
|
||||
invited = []
|
||||
for room_id, events in iteritems(mem_change_events_by_room_id):
|
||||
for room_id, events in mem_change_events_by_room_id.items():
|
||||
logger.debug(
|
||||
"Membership changes in %s: [%s]",
|
||||
room_id,
|
||||
|
@ -1898,6 +1896,9 @@ class SyncHandler(object):
|
|||
if notifs is not None:
|
||||
unread_notifications["notification_count"] = notifs["notify_count"]
|
||||
unread_notifications["highlight_count"] = notifs["highlight_count"]
|
||||
unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
|
||||
"unread_count"
|
||||
]
|
||||
|
||||
sync_result_builder.joined.append(room_sync)
|
||||
|
||||
|
@ -1996,17 +1997,17 @@ def _calculate_state(
|
|||
event_id_to_key = {
|
||||
e: key
|
||||
for key, e in itertools.chain(
|
||||
iteritems(timeline_contains),
|
||||
iteritems(previous),
|
||||
iteritems(timeline_start),
|
||||
iteritems(current),
|
||||
timeline_contains.items(),
|
||||
previous.items(),
|
||||
timeline_start.items(),
|
||||
current.items(),
|
||||
)
|
||||
}
|
||||
|
||||
c_ids = set(itervalues(current))
|
||||
ts_ids = set(itervalues(timeline_start))
|
||||
p_ids = set(itervalues(previous))
|
||||
tc_ids = set(itervalues(timeline_contains))
|
||||
c_ids = set(current.values())
|
||||
ts_ids = set(timeline_start.values())
|
||||
p_ids = set(previous.values())
|
||||
tc_ids = set(timeline_contains.values())
|
||||
|
||||
# If we are lazyloading room members, we explicitly add the membership events
|
||||
# for the senders in the timeline into the state block returned by /sync,
|
||||
|
@ -2020,7 +2021,7 @@ def _calculate_state(
|
|||
|
||||
if lazy_load_members:
|
||||
p_ids.difference_update(
|
||||
e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
|
||||
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
|
||||
)
|
||||
|
||||
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import List
|
||||
from typing import List, Tuple
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -259,14 +259,31 @@ class TypingHandler(object):
|
|||
)
|
||||
|
||||
async def get_all_typing_updates(
|
||||
self, last_id: int, current_id: int, limit: int
|
||||
) -> List[dict]:
|
||||
"""Get up to `limit` typing updates between the given tokens, earliest
|
||||
updates first.
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, list]], int, bool]:
|
||||
"""Get updates for typing replication stream.
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
here since there is only ever one writer.
|
||||
last_id: The token to fetch updates from. Exclusive.
|
||||
current_id: The token to fetch updates up to. Inclusive.
|
||||
limit: The requested limit for the number of rows to return. The
|
||||
function may return more or fewer rows.
|
||||
|
||||
Returns:
|
||||
A tuple consisting of: the updates, a token to use to fetch
|
||||
subsequent updates, and whether we returned fewer rows than exists
|
||||
between the requested tokens due to the limit.
|
||||
|
||||
The token returned can be used in a subsequent call to this
|
||||
function to get further updatees.
|
||||
|
||||
The updates are a list of 2-tuples of stream ID and the row data
|
||||
"""
|
||||
|
||||
if last_id == current_id:
|
||||
return []
|
||||
return [], current_id, False
|
||||
|
||||
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
|
||||
last_id
|
||||
|
@ -280,9 +297,16 @@ class TypingHandler(object):
|
|||
serial = self._room_serials[room_id]
|
||||
if last_id < serial <= current_id:
|
||||
typing = self._room_typing[room_id]
|
||||
rows.append((serial, room_id, list(typing)))
|
||||
rows.append((serial, [room_id, list(typing)]))
|
||||
rows.sort()
|
||||
return rows[:limit]
|
||||
|
||||
limited = False
|
||||
if len(rows) > limit:
|
||||
rows = rows[:limit]
|
||||
current_id = rows[-1][0]
|
||||
limited = True
|
||||
|
||||
return rows, current_id, limited
|
||||
|
||||
def get_current_token(self):
|
||||
return self._latest_room_serial
|
||||
|
|
|
@ -15,8 +15,6 @@
|
|||
|
||||
import logging
|
||||
|
||||
from six import iteritems, iterkeys
|
||||
|
||||
import synapse.metrics
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.handlers.state_deltas import StateDeltasHandler
|
||||
|
@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
|||
users_with_profile = await self.state.get_current_users_in_room(room_id)
|
||||
|
||||
# Remove every user from the sharing tables for that room.
|
||||
for user_id in iterkeys(users_with_profile):
|
||||
for user_id in users_with_profile.keys():
|
||||
await self.store.remove_user_who_share_room(user_id, room_id)
|
||||
|
||||
# Then, re-add them to the tables.
|
||||
|
@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
|||
# which when ran over an entire room, will result in the same values
|
||||
# being added multiple times. The batching upserts shouldn't make this
|
||||
# too bad, though.
|
||||
for user_id, profile in iteritems(users_with_profile):
|
||||
for user_id, profile in users_with_profile.items():
|
||||
await self._handle_new_user(room_id, user_id, profile)
|
||||
|
||||
async def _handle_new_user(self, room_id, user_id, profile):
|
||||
|
|
|
@ -15,11 +15,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
from io import BytesIO
|
||||
|
||||
from six import raise_from, text_type
|
||||
from six.moves import urllib
|
||||
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from netaddr import IPAddress
|
||||
|
@ -577,7 +575,7 @@ class SimpleHttpClient(object):
|
|||
# This can happen e.g. because the body is too large.
|
||||
raise
|
||||
except Exception as e:
|
||||
raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
|
||||
raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
|
||||
|
||||
return (
|
||||
length,
|
||||
|
@ -638,7 +636,7 @@ def encode_urlencode_args(args):
|
|||
|
||||
|
||||
def encode_urlencode_arg(arg):
|
||||
if isinstance(arg, text_type):
|
||||
if isinstance(arg, str):
|
||||
return arg.encode("utf-8")
|
||||
elif isinstance(arg, list):
|
||||
return [encode_urlencode_arg(i) for i in arg]
|
||||
|
|
|
@ -48,6 +48,9 @@ class MatrixFederationAgent(object):
|
|||
tls_client_options_factory (FederationPolicyForHTTPS|None):
|
||||
factory to use for fetching client tls options, or none to disable TLS.
|
||||
|
||||
user_agent (bytes):
|
||||
The user agent header to use for federation requests.
|
||||
|
||||
_srv_resolver (SrvResolver|None):
|
||||
SRVResolver impl to use for looking up SRV records. None to use a default
|
||||
implementation.
|
||||
|
@ -61,6 +64,7 @@ class MatrixFederationAgent(object):
|
|||
self,
|
||||
reactor,
|
||||
tls_client_options_factory,
|
||||
user_agent,
|
||||
_srv_resolver=None,
|
||||
_well_known_resolver=None,
|
||||
):
|
||||
|
@ -78,6 +82,7 @@ class MatrixFederationAgent(object):
|
|||
),
|
||||
pool=self._pool,
|
||||
)
|
||||
self.user_agent = user_agent
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
|
@ -87,6 +92,7 @@ class MatrixFederationAgent(object):
|
|||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
),
|
||||
user_agent=self.user_agent,
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
|
@ -149,7 +155,7 @@ class MatrixFederationAgent(object):
|
|||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# We need to make sure the host header is set to the netloc of the
|
||||
# server.
|
||||
# server and that a user-agent is provided.
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
else:
|
||||
|
@ -157,6 +163,8 @@ class MatrixFederationAgent(object):
|
|||
|
||||
if not headers.hasHeader(b"host"):
|
||||
headers.addRawHeader(b"host", parsed_uri.netloc)
|
||||
if not headers.hasHeader(b"user-agent"):
|
||||
headers.addRawHeader(b"user-agent", self.user_agent)
|
||||
|
||||
res = yield make_deferred_yieldable(
|
||||
self._agent.request(method, uri, headers, bodyProducer)
|
||||
|
|
|
@ -23,6 +23,7 @@ import attr
|
|||
from twisted.internet import defer
|
||||
from twisted.web.client import RedirectAgent, readBody
|
||||
from twisted.web.http import stringToDatetime
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock
|
||||
|
@ -78,7 +79,12 @@ class WellKnownResolver(object):
|
|||
"""
|
||||
|
||||
def __init__(
|
||||
self, reactor, agent, well_known_cache=None, had_well_known_cache=None
|
||||
self,
|
||||
reactor,
|
||||
agent,
|
||||
user_agent,
|
||||
well_known_cache=None,
|
||||
had_well_known_cache=None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
@ -92,6 +98,7 @@ class WellKnownResolver(object):
|
|||
self._well_known_cache = well_known_cache
|
||||
self._had_valid_well_known_cache = had_well_known_cache
|
||||
self._well_known_agent = RedirectAgent(agent)
|
||||
self.user_agent = user_agent
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_well_known(self, server_name):
|
||||
|
@ -231,6 +238,10 @@ class WellKnownResolver(object):
|
|||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
|
||||
headers = {
|
||||
b"User-Agent": [self.user_agent],
|
||||
}
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
i += 1
|
||||
|
@ -238,7 +249,9 @@ class WellKnownResolver(object):
|
|||
logger.info("Fetching %s", uri_str)
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
self._well_known_agent.request(
|
||||
b"GET", uri, headers=Headers(headers)
|
||||
)
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
|
||||
|
|
|
@ -17,11 +17,9 @@ import cgi
|
|||
import logging
|
||||
import random
|
||||
import sys
|
||||
import urllib
|
||||
from io import BytesIO
|
||||
|
||||
from six import raise_from, string_types
|
||||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
self.reactor = Reactor()
|
||||
|
||||
self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
|
||||
user_agent = hs.version_string
|
||||
if hs.config.user_agent_suffix:
|
||||
user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
|
||||
user_agent = user_agent.encode("ascii")
|
||||
|
||||
self.agent = MatrixFederationAgent(
|
||||
self.reactor, tls_client_options_factory, user_agent
|
||||
)
|
||||
|
||||
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
|
||||
# blacklist via IP literals in server names
|
||||
|
@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object):
|
|||
except TimeoutError as e:
|
||||
raise RequestSendFailed(e, can_retry=True) from e
|
||||
except DNSLookupError as e:
|
||||
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
|
||||
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
|
||||
except Exception as e:
|
||||
logger.info("Failed to send request: %s", e)
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
raise RequestSendFailed(e, can_retry=True) from e
|
||||
|
||||
incoming_responses_counter.labels(
|
||||
request.method, response.code
|
||||
|
@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object):
|
|||
# Retry if the error is a 429 (Too Many Requests),
|
||||
# otherwise just raise a standard HttpResponseException
|
||||
if response.code == 429:
|
||||
raise_from(RequestSendFailed(e, can_retry=True), e)
|
||||
raise RequestSendFailed(e, can_retry=True) from e
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
@ -998,7 +1003,7 @@ def encode_query_args(args):
|
|||
|
||||
encoded_args = {}
|
||||
for k, vs in args.items():
|
||||
if isinstance(vs, string_types):
|
||||
if isinstance(vs, str):
|
||||
vs = [vs]
|
||||
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
||||
|
||||
|
|
|
@ -16,10 +16,10 @@
|
|||
|
||||
import collections
|
||||
import html
|
||||
import http.client
|
||||
import logging
|
||||
import types
|
||||
import urllib
|
||||
from http import HTTPStatus
|
||||
from io import BytesIO
|
||||
from typing import Awaitable, Callable, TypeVar, Union
|
||||
|
||||
|
@ -188,7 +188,7 @@ def return_html_error(
|
|||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
else:
|
||||
code = http.HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
code = HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
msg = "Internal server error"
|
||||
|
||||
logger.error(
|
||||
|
|
|
@ -19,6 +19,7 @@ from typing import Optional
|
|||
from twisted.python.failure import Failure
|
||||
from twisted.web.server import Request, Site
|
||||
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||
|
@ -350,7 +351,7 @@ class SynapseSite(Site):
|
|||
self,
|
||||
logger_name,
|
||||
site_tag,
|
||||
config,
|
||||
config: ListenerConfig,
|
||||
resource,
|
||||
server_version_string,
|
||||
*args,
|
||||
|
@ -360,7 +361,8 @@ class SynapseSite(Site):
|
|||
|
||||
self.site_tag = site_tag
|
||||
|
||||
proxied = config.get("x_forwarded", False)
|
||||
assert config.http_options is not None
|
||||
proxied = config.http_options.x_forwarded
|
||||
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
|
||||
self.access_logger = logging.getLogger(logger_name)
|
||||
self.server_version_string = server_version_string.encode("ascii")
|
||||
|
|
|
@ -16,8 +16,7 @@
|
|||
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
from six import StringIO
|
||||
from io import StringIO
|
||||
|
||||
|
||||
class LogFormatter(logging.Formatter):
|
||||
|
|
|
@ -171,8 +171,9 @@ import logging
|
|||
import re
|
||||
import types
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Dict
|
||||
from typing import TYPE_CHECKING, Dict, Optional, Type
|
||||
|
||||
import attr
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -232,6 +233,30 @@ except ImportError:
|
|||
LogContextScopeManager = None # type: ignore
|
||||
|
||||
|
||||
try:
|
||||
from rust_python_jaeger_reporter import Reporter
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class _WrappedRustReporter:
|
||||
"""Wrap the reporter to ensure `report_span` never throws.
|
||||
"""
|
||||
|
||||
_reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter))
|
||||
|
||||
def set_process(self, *args, **kwargs):
|
||||
return self._reporter.set_process(*args, **kwargs)
|
||||
|
||||
def report_span(self, span):
|
||||
try:
|
||||
return self._reporter.report_span(span)
|
||||
except Exception:
|
||||
logger.exception("Failed to report span")
|
||||
|
||||
RustReporter = _WrappedRustReporter # type: Optional[Type[_WrappedRustReporter]]
|
||||
except ImportError:
|
||||
RustReporter = None
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"):
|
|||
|
||||
set_homeserver_whitelist(hs.config.opentracer_whitelist)
|
||||
|
||||
JaegerConfig(
|
||||
config = JaegerConfig(
|
||||
config=hs.config.jaeger_config,
|
||||
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
|
||||
scope_manager=LogContextScopeManager(hs.config),
|
||||
).initialize_tracer()
|
||||
)
|
||||
|
||||
# If we have the rust jaeger reporter available let's use that.
|
||||
if RustReporter:
|
||||
logger.info("Using rust_python_jaeger_reporter library")
|
||||
tracer = config.create_tracer(RustReporter(), config.sampler)
|
||||
opentracing.set_global_tracer(tracer)
|
||||
else:
|
||||
config.initialize_tracer()
|
||||
|
||||
|
||||
# Whitelisting
|
||||
|
|
|
@ -22,8 +22,6 @@ import threading
|
|||
import time
|
||||
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
|
||||
|
||||
import six
|
||||
|
||||
import attr
|
||||
from prometheus_client import Counter, Gauge, Histogram
|
||||
from prometheus_client.core import (
|
||||
|
@ -83,7 +81,7 @@ class LaterGauge(object):
|
|||
return
|
||||
|
||||
if isinstance(calls, dict):
|
||||
for k, v in six.iteritems(calls):
|
||||
for k, v in calls.items():
|
||||
g.add_metric(k, v)
|
||||
else:
|
||||
g.add_metric([], calls)
|
||||
|
@ -194,7 +192,7 @@ class InFlightGauge(object):
|
|||
gauge = GaugeMetricFamily(
|
||||
"_".join([self.name, name]), "", labels=self.labels
|
||||
)
|
||||
for key, metrics in six.iteritems(metrics_by_key):
|
||||
for key, metrics in metrics_by_key.items():
|
||||
gauge.add_metric(key, getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six import iteritems, itervalues
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
@ -130,7 +128,7 @@ class BulkPushRuleEvaluator(object):
|
|||
event, prev_state_ids, for_verification=False
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
|
||||
auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
|
||||
|
||||
sender_level = get_user_power_level(event.sender, auth_events)
|
||||
|
||||
|
@ -162,7 +160,7 @@ class BulkPushRuleEvaluator(object):
|
|||
|
||||
condition_cache = {}
|
||||
|
||||
for uid, rules in iteritems(rules_by_user):
|
||||
for uid, rules in rules_by_user.items():
|
||||
if event.sender == uid:
|
||||
continue
|
||||
|
||||
|
@ -191,8 +189,11 @@ class BulkPushRuleEvaluator(object):
|
|||
)
|
||||
if matches:
|
||||
actions = [x for x in rule["actions"] if x != "dont_notify"]
|
||||
if actions and "notify" in actions:
|
||||
# Push rules say we should notify the user of this event
|
||||
if (
|
||||
"notify" in actions
|
||||
or "org.matrix.msc2625.mark_unread" in actions
|
||||
):
|
||||
# Push rules say we should act on this event.
|
||||
actions_by_user[uid] = actions
|
||||
break
|
||||
|
||||
|
@ -395,7 +396,7 @@ class RulesForRoom(object):
|
|||
# If the event is a join event then it will be in current state evnts
|
||||
# map but not in the DB, so we have to explicitly insert it.
|
||||
if event.type == EventTypes.Member:
|
||||
for event_id in itervalues(member_event_ids):
|
||||
for event_id in member_event_ids.values():
|
||||
if event_id == event.event_id:
|
||||
members[event_id] = (event.state_key, event.membership)
|
||||
|
||||
|
@ -404,7 +405,7 @@ class RulesForRoom(object):
|
|||
|
||||
interested_in_user_ids = {
|
||||
user_id
|
||||
for user_id, membership in itervalues(members)
|
||||
for user_id, membership in members.values()
|
||||
if membership == Membership.JOIN
|
||||
}
|
||||
|
||||
|
@ -415,7 +416,7 @@ class RulesForRoom(object):
|
|||
)
|
||||
|
||||
user_ids = {
|
||||
uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
|
||||
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
|
||||
}
|
||||
|
||||
logger.debug("With pushers: %r", user_ids)
|
||||
|
@ -436,7 +437,7 @@ class RulesForRoom(object):
|
|||
)
|
||||
|
||||
ret_rules_by_user.update(
|
||||
item for item in iteritems(rules_by_user) if item[0] is not None
|
||||
item for item in rules_by_user.items() if item[0] is not None
|
||||
)
|
||||
|
||||
self.update_cache(sequence, members, ret_rules_by_user, state_group)
|
||||
|
|
|
@ -17,12 +17,11 @@ import email.mime.multipart
|
|||
import email.utils
|
||||
import logging
|
||||
import time
|
||||
import urllib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from typing import Iterable, List, TypeVar
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
import bleach
|
||||
import jinja2
|
||||
|
||||
|
|
|
@ -18,8 +18,6 @@ import logging
|
|||
import re
|
||||
from typing import Pattern
|
||||
|
||||
from six import string_types
|
||||
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import UserID
|
||||
from synapse.util.caches import register_cache
|
||||
|
@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object):
|
|||
# XXX: optimisation: cache our pattern regexps
|
||||
if condition["key"] == "content.body":
|
||||
body = self._event.content.get("body", None)
|
||||
if not body:
|
||||
if not body or not isinstance(body, str):
|
||||
return False
|
||||
|
||||
return _glob_matches(pattern, body, word_boundary=True)
|
||||
|
@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object):
|
|||
return False
|
||||
|
||||
body = self._event.content.get("body", None)
|
||||
if not body:
|
||||
if not body or not isinstance(body, str):
|
||||
return False
|
||||
|
||||
# Similar to _glob_matches, but do not treat display_name as a glob.
|
||||
|
@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None):
|
|||
if result is None:
|
||||
result = {}
|
||||
for key, value in d.items():
|
||||
if isinstance(value, string_types):
|
||||
if isinstance(value, str):
|
||||
result[".".join(prefix + [key])] = value.lower()
|
||||
elif hasattr(value, "items"):
|
||||
_flatten_dict(value, prefix=(prefix + [key]), result=result)
|
||||
|
|
|
@ -39,7 +39,10 @@ def get_badge_count(store, user_id):
|
|||
)
|
||||
# return one badge count per conversation, as count per
|
||||
# message is so noisy as to be almost useless
|
||||
badge += 1 if notifs["notify_count"] else 0
|
||||
# We're populating this badge using the unread_count (instead of the
|
||||
# notify_count) as this badge is the number of missed messages, not the
|
||||
# number of missed notifications.
|
||||
badge += 1 if notifs["unread_count"] else 0
|
||||
return badge
|
||||
|
||||
|
||||
|
|
|
@ -215,11 +215,9 @@ class PusherPool:
|
|||
try:
|
||||
# Need to subtract 1 from the minimum because the lower bound here
|
||||
# is not inclusive
|
||||
updated_receipts = yield self.store.get_all_updated_receipts(
|
||||
users_affected = yield self.store.get_users_sent_receipts_between(
|
||||
min_stream_id - 1, max_stream_id
|
||||
)
|
||||
# This returns a tuple, user_id is at index 3
|
||||
users_affected = {r[3] for r in updated_receipts}
|
||||
|
||||
for u in users_affected:
|
||||
if u in self.pushers:
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue