Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2019-11-26 18:56:54 +00:00
commit c665d154a2
56 changed files with 457 additions and 154 deletions

View file

@ -1,7 +1,10 @@
Synapse 1.6.0 (2019-11-26)
==========================
No significant changes.
Bugfixes
--------
- Fix phone home stats reporting. ([\#6418](https://github.com/matrix-org/synapse/issues/6418))
Synapse 1.6.0rc2 (2019-11-25)

View file

@ -133,9 +133,9 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools"
```
#### Mac OS X
#### macOS
Installing prerequisites on Mac OS X:
Installing prerequisites on macOS:
```
xcode-select --install
@ -144,6 +144,14 @@ sudo pip install virtualenv
brew install pkg-config libffi
```
On macOS Catalina (10.15) you may need to explicitly install OpenSSL
via brew and inform `pip` about it so that `psycopg2` builds:
```
brew install openssl@1.1
export LDFLAGS=-L/usr/local/Cellar/openssl\@1.1/1.1.1d/lib/
```
#### OpenSUSE
Installing prerequisites on openSUSE:

1
changelog.d/6322.misc Normal file
View file

@ -0,0 +1 @@
Improve the performance of outputting structured logging.

1
changelog.d/6332.bugfix Normal file
View file

@ -0,0 +1 @@
Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices.

1
changelog.d/6333.bugfix Normal file
View file

@ -0,0 +1 @@
Prevent account data syncs getting lost across TCP replication.

1
changelog.d/6343.misc Normal file
View file

@ -0,0 +1 @@
Refactor some code in the event authentication path for clarity.

1
changelog.d/6362.misc Normal file
View file

@ -0,0 +1 @@
Clean up some unnecessary quotation marks around the codebase.

1
changelog.d/6379.misc Normal file
View file

@ -0,0 +1 @@
Complain on startup instead of 500'ing during runtime when `public_baseurl` isn't set when necessary.

1
changelog.d/6388.doc Normal file
View file

@ -0,0 +1 @@
Fix link in the user directory documentation.

1
changelog.d/6390.doc Normal file
View file

@ -0,0 +1 @@
Add build instructions to the docker readme.

1
changelog.d/6392.misc Normal file
View file

@ -0,0 +1 @@
Add a test scenario to make sure room history purges don't break `/messages` in the future.

1
changelog.d/6408.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an intermittent exception when handling read-receipts.

1
changelog.d/6420.bugfix Normal file
View file

@ -0,0 +1 @@
Fix broken guest registration when there are existing blocks of numeric user IDs.

1
changelog.d/6421.bugfix Normal file
View file

@ -0,0 +1 @@
Fix startup error when http proxy is defined.

View file

@ -130,3 +130,15 @@ docker run -it --rm \
This will generate the same configuration file as the legacy mode used, but
will store it in `/data/homeserver.yaml` instead of a temporary location. You
can then use it as shown above at [Running synapse](#running-synapse).
## Building the image
If you need to build the image from a Synapse checkout, use the following `docker
build` command from the repo's root:
```
docker build -t matrixdotorg/synapse -f docker/Dockerfile .
```
You can choose to build a different docker image by changing the value of the `-f` flag to
point to another Dockerfile.

View file

@ -7,7 +7,6 @@ who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL here
https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory.

View file

@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()

View file

@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient):
if not _is_valid_3pe_metadata(info):
logger.warning(
"query_3pe_protocol to %s did not return a" " valid result", uri
"query_3pe_protocol to %s did not return a valid result", uri
)
return None

View file

@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
for regex_obj in as_info["namespaces"][ns]:
if not isinstance(regex_obj, dict):
raise ValueError(
"Expected namespace entry in %s to be an object," " but got %s",
"Expected namespace entry in %s to be an object, but got %s",
ns,
regex_obj,
)

View file

@ -146,6 +146,8 @@ class EmailConfig(Config):
if k not in email_config:
missing.append("email." + k)
# public_baseurl is required to build password reset and validation links that
# will be emailed to users
if config.get("public_baseurl") is None:
missing.append("public_baseurl")

View file

@ -106,6 +106,13 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {}
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
if self.account_threepid_delegate_msisdn and not self.public_baseurl:
raise ConfigError(
"The configuration option `public_baseurl` is required if "
"`account_threepid_delegate.msisdn` is set, such that "
"clients know where to submit validation tokens to. Please "
"configure `public_baseurl`."
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)

View file

@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
self.action = action
else:
raise ConfigError(
"%s rules can only have action of 'allow'" " or 'deny'" % (option_name,)
"%s rules can only have action of 'allow' or 'deny'" % (option_name,)
)
self._alias_matches_all = alias == "*"

View file

@ -223,7 +223,7 @@ class ServerConfig(Config):
self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
except Exception as e:
raise ConfigError(
"Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
if self.public_baseurl is not None:
@ -787,14 +787,14 @@ class ServerConfig(Config):
"--print-pidfile",
action="store_true",
default=None,
help="Print the path to the pidfile just" " before daemonizing",
help="Print the path to the pidfile just before daemonizing",
)
server_group.add_argument(
"--manhole",
metavar="PORT",
dest="manhole",
type=int,
help="Turn on the twisted telnet manhole" " service on the given port.",
help="Turn on the twisted telnet manhole service on the given port.",
)

View file

@ -44,7 +44,7 @@ class TransactionActions(object):
response code and response body.
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.get_received_txn_response(transaction.transaction_id, origin)
@ -56,7 +56,7 @@ class TransactionActions(object):
Deferred
"""
if not transaction.transaction_id:
raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
raise RuntimeError("Cannot persist a transaction with no transaction_id")
return self.store.set_received_txn_response(
transaction.transaction_id, origin, code, response

View file

@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total",
"" "Total number of PDUs queued for sending across all destinations",
"Total number of PDUs queued for sending across all destinations",
)

View file

@ -88,7 +88,7 @@ class TransactionManager(object):
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
"TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
@ -107,7 +107,7 @@ class TransactionManager(object):
self._next_txn_id += 1
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
"TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,

View file

@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
"This application service has not reserved" " this kind of alias.",
"This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE,
)
else:

View file

@ -30,6 +30,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
@ -53,6 +54,12 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@ -191,9 +198,15 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}

View file

@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
What we expect the event's auth_events to be, based on the event's
position in the dag. I think? maybe??
Normally, our calculated auth_events based on the state of the room
at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it.
Returns:
@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler):
origin (str):
event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
at the event's position in the DAG, though occasionally (eg if the
event is an outlier), may be the auth events claimed by the remote
server.
Also NB that this function adds entries to it.
Returns:
defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
if event.is_state():
event_key = (event.type, event.state_key)
else:
event_key = None
# if the event's auth_events refers to events which are not in our
# calculated auth_events, we need to fetch those events from somewhere.
#
# we start by fetching them from the store, and then try calling /event_auth/.
# missing_auth is the set of the event's auth_events which we don't yet have
# in auth_events.
missing_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
# if we have missing events, we need to fetch those events from somewhere.
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
logger.debug("Got events %s from store", have_events)
logger.debug("Found events %s in the store", have_events)
missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler):
event.auth_event_ids()
)
except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
if event.internal_metadata.is_outlier():
# XXX: given that, for an outlier, we'll be working with the
# event's *claimed* auth events rather than those we calculated:
# (a) is there any point in this test, since different_auth below will
# obviously be empty
# (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier")
return context
# FIXME: Assumes we have and stored all the state for all the
# prev_events
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler):
different_auth,
)
# now we state-resolve between our own idea of the auth events, and the remote's
# idea of them.
room_version = yield self.store.get_room_version(event.room_id)
different_event_ids = [
d for d in different_auth if d in have_events and not have_events[d]
]
different_events = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store.get_event, d, allow_none=True, allow_rejected=False
)
for d in different_auth
if d in have_events and not have_events[d]
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
if different_event_ids:
# XXX: currently this checks for redactions but I'm not convinced that is
# necessary?
different_events = yield self.store.get_events_as_list(different_event_ids)
if different_events:
local_view = dict(auth_events)
remote_view = dict(auth_events)
remote_view.update(
{(d.type, d.state_key): d for d in different_events if d}
)
remote_view.update({(d.type, d.state_key): d for d in different_events})
new_state = yield self.state_handler.resolve_events(
room_version,
@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
context = yield self._update_context_for_auth_events(
event, context, auth_events, event_key
event, context, auth_events
)
return context
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key):
def _update_context_for_auth_events(self, event, context, auth_events):
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.
@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler):
context (synapse.events.snapshot.EventContext): initial event context
auth_events (dict[(str, str)->str]): Events to update in the event
auth_events (dict[(str, str)->EventBase]): Events to update in the event
context.
event_key ((str, str)): (type, state_key) for the current event.
this will not be included in the current_state in the context.
Returns:
Deferred[EventContext]: new event context
"""
# exclude the state key of the new event from the current_state in the context.
if event.is_state():
event_key = (event.type, event.state_key)
else:
event_key = None
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)

View file

@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
return {b"true": True, b"false": False}[args[name][0]]
except Exception:
message = (
"Boolean query parameter %r must be one of" " ['true', 'false']"
"Boolean query parameter %r must be one of ['true', 'false']"
) % (name,)
raise SynapseError(400, message)
else:

View file

@ -261,6 +261,18 @@ def parse_drain_configs(
)
class StoppableLogPublisher(LogPublisher):
"""
A log publisher that can tell its observers to shut down any external
communications.
"""
def stop(self):
for obs in self._observers:
if hasattr(obs, "stop"):
obs.stop()
def setup_structured_logging(
hs,
config,
@ -336,7 +348,7 @@ def setup_structured_logging(
# We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers)
publisher = StoppableLogPublisher(*observers)
log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get(

View file

@ -17,25 +17,29 @@
Log formatters that output terse JSON.
"""
import json
import sys
import traceback
from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor
from typing import IO
from typing import IO, Optional
import attr
from simplejson import dumps
from zope.interface import implementer
from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
from twisted.logger import FileLogObserver, ILogObserver, Logger
from twisted.python.failure import Failure
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@ -141,11 +145,49 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
def formatEvent(_event: dict) -> str:
flattened = flatten_event(_event, metadata)
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
return _encoder.encode(flattened) + "\n"
return FileLogObserver(outFile, formatEvent)
@attr.s
@implementer(IPushProducer)
class LogProducer(object):
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
"""
transport = attr.ib(type=ITransport)
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
def pauseProducing(self):
self._paused = True
def stopProducing(self):
self._paused = True
self._buffer = None
def resumeProducing(self):
self._paused = False
while self._paused is False and (self._buffer and self.transport.connected):
try:
event = self._buffer.popleft()
self.transport.write(_encoder.encode(event).encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break
@attr.s
@implementer(ILogObserver)
class TerseJSONToTCPLogObserver(object):
@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
metadata = attr.ib(type=dict)
maximum_buffer = attr.ib(type=int)
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
_writer = attr.ib(default=None)
_connection_waiter = attr.ib(default=None, type=Optional[Deferred])
_logger = attr.ib(default=attr.Factory(Logger))
_producer = attr.ib(default=None, type=Optional[LogProducer])
def start(self) -> None:
@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object):
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
self._service.startService()
self._connect()
def _write_loop(self) -> None:
def stop(self):
self._service.stopService()
def _connect(self) -> None:
"""
Implement the write loop.
Triggers an attempt to connect then write to the remote if not already writing.
"""
if self._writer:
if self._connection_waiter:
return
self._writer = self._service.whenConnected()
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
@self._writer.addBoth
@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
@self._connection_waiter.addCallback
def writer(r):
if isinstance(r, Failure):
r.printTraceback(file=sys.__stderr__)
self._writer = None
self.hs.get_reactor().callLater(1, self._write_loop)
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
return
try:
for event in self._buffer:
r.transport.write(
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
"utf8"
)
)
r.transport.write(b"\n")
self._buffer.clear()
except Exception as e:
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
# If the producer is still producing, stop it.
if self._producer:
self._producer.stopProducing()
self._writer = False
self.hs.get_reactor().callLater(1, self._write_loop)
# Make a new producer and start it.
self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
def _handle_pressure(self) -> None:
"""
@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object):
self._logger.failure("Failed clearing backpressure")
# Try and write immediately.
self._write_loop()
self._connect()

View file

@ -250,7 +250,7 @@ class HttpPusher(object):
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warning(
"Giving up on a notification to user %s, " "pushkey %s",
"Giving up on a notification to user %s, pushkey %s",
self.user_id,
self.pushkey,
)
@ -303,8 +303,7 @@ class HttpPusher(object):
# for sanity, we only remove the pushkey if it
# was the one we actually sent...
logger.warning(
("Ignoring rejected pushkey %s because we" " didn't send it"),
pk,
("Ignoring rejected pushkey %s because we didn't send it"), pk,
)
else:
logger.info("Pushkey %s was rejected: removing", pk)

View file

@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
MESSAGE_FROM_PERSON_IN_ROOM = (
"You have a message on %(app)s from %(person)s " "in the %(room)s room..."
"You have a message on %(app)s from %(person)s in the %(room)s room..."
)
MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
"You have messages on %(app)s from %(person)s and others..."
)
INVITE_FROM_PERSON_TO_ROOM = (
"%(person)s has invited you to join the " "%(room)s room on %(app)s..."
"%(person)s has invited you to join the %(room)s room on %(app)s..."
)
INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."

View file

@ -14,7 +14,14 @@
# limitations under the License.
from synapse.http.server import JsonResource
from synapse.replication.http import federation, login, membership, register, send_event
from synapse.replication.http import (
devices,
federation,
login,
membership,
register,
send_event,
)
REPLICATION_PREFIX = "/_synapse/replication"
@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
federation.register_servlets(hs, self)
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)

View file

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.replication.http._base import ReplicationEndpoint
logger = logging.getLogger(__name__)
class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.
This must happen on master so that the results can be correctly cached in
the database and streamed to workers.
Request format:
POST /_synapse/replication/user_device_resync/:user_id
{}
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:
{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""
NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs):
super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
self.device_list_updater = hs.get_device_handler().device_list_updater
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(user_id):
return {}
async def _handle_request(self, request, user_id):
user_devices = await self.device_list_updater.user_device_resync(user_id)
return 200, user_devices
def register_servlets(hs, http_server):
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)

View file

@ -88,8 +88,7 @@ TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream",
("user_id", "room_id", "data_type", "data"), # str # str # str # dict
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
@ -421,8 +420,8 @@ class AccountDataStream(Stream):
results = list(room_results)
results.extend(
(stream_id, user_id, None, account_data_type, content)
for stream_id, user_id, account_data_type, content in global_results
(stream_id, user_id, None, account_data_type)
for stream_id, user_id, account_data_type in global_results
)
return results

View file

@ -77,8 +77,8 @@ class PreviewUrlResource(DirectServeResource):
treq_args={"browser_like_redirects": True},
ip_whitelist=hs.config.url_preview_ip_range_whitelist,
ip_blacklist=hs.config.url_preview_ip_range_blacklist,
http_proxy=os.getenv("http_proxy"),
https_proxy=os.getenv("HTTPS_PROXY"),
http_proxy=os.getenvb(b"http_proxy"),
https_proxy=os.getenvb(b"HTTPS_PROXY"),
)
self.media_repo = media_repo
self.primary_base_path = media_repo.primary_base_path
@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
pattern = entry[attrib]
value = getattr(url_tuple, attrib)
logger.debug(
"Matching attrib '%s' with value '%s' against" " pattern '%s'",
"Matching attrib '%s' with value '%s' against pattern '%s'",
attrib,
value,
pattern,

View file

@ -318,8 +318,8 @@ class HomeServer(object):
def build_proxied_http_client(self):
return SimpleHttpClient(
self,
http_proxy=os.getenv("http_proxy"),
https_proxy=os.getenv("HTTPS_PROXY"),
http_proxy=os.getenvb(b"http_proxy"),
https_proxy=os.getenvb(b"HTTPS_PROXY"),
)
def build_room_creation_handler(self):

View file

@ -54,7 +54,7 @@ class ConsentServerNotices(object):
)
if "body" not in self._server_notice_content:
raise ConfigError(
"user_consent server_notice_consent must contain a 'body' " "key."
"user_consent server_notice_consent must contain a 'body' key."
)
self._consent_uri_builder = ConsentURIBuilder(hs.config)

View file

@ -409,16 +409,15 @@ class SQLBaseStore(object):
i = 0
N = 5
while True:
cursor = LoggingTransaction(
conn.cursor(),
name,
self.database_engine,
after_callbacks,
exception_callbacks,
)
try:
txn = conn.cursor()
txn = LoggingTransaction(
txn,
name,
self.database_engine,
after_callbacks,
exception_callbacks,
)
r = func(txn, *args, **kwargs)
r = func(cursor, *args, **kwargs)
conn.commit()
return r
except self.database_engine.module.OperationalError as e:
@ -456,6 +455,40 @@ class SQLBaseStore(object):
)
continue
raise
finally:
# we're either about to retry with a new cursor, or we're about to
# release the connection. Once we release the connection, it could
# get used for another query, which might do a conn.rollback().
#
# In the latter case, even though that probably wouldn't affect the
# results of this transaction, python's sqlite will reset all
# statements on the connection [1], which will make our cursor
# invalid [2].
#
# In any case, continuing to read rows after commit()ing seems
# dubious from the PoV of ACID transactional semantics
# (sqlite explicitly says that once you commit, you may see rows
# from subsequent updates.)
#
# In psycopg2, cursors are essentially a client-side fabrication -
# all the data is transferred to the client side when the statement
# finishes executing - so in theory we could go on streaming results
# from the cursor, but attempting to do so would make us
# incompatible with sqlite, so let's make sure we're not doing that
# by closing the cursor.
#
# (*named* cursors in psycopg2 are different and are proper server-
# side things, but (a) we don't use them and (b) they are implicitly
# closed by ending the transaction anyway.)
#
# In short, if we haven't finished with the cursor yet, that's a
# problem waiting to bite us.
#
# TL;DR: we're done with the cursor, so we can close it.
#
# [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
# [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
cursor.close()
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
@ -851,7 +884,7 @@ class SQLBaseStore(object):
allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),

View file

@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
current_id(int): The position to fetch up to.
Returns:
A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string.
room_id string, and type string.
"""
if last_room_id == current_id and last_global_id == current_id:
return defer.succeed(([], []))
def get_updated_account_data_txn(txn):
sql = (
"SELECT stream_id, user_id, account_data_type, content"
"SELECT stream_id, user_id, account_data_type"
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
global_results = txn.fetchall()
sql = (
"SELECT stream_id, user_id, room_id, account_data_type, content"
"SELECT stream_id, user_id, room_id, account_data_type"
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)

View file

@ -380,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
sql = "SELECT device_id FROM devices WHERE user_id = ?"
txn.execute(sql, (user_id,))
message_json = json.dumps(messages_by_device["*"])
for row in txn:

View file

@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
result.setdefault(user_id, {})[device_id] = None
# get signatures on the device
signature_sql = (
"SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s"
) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
" OR ".join("(" + q + ")" for q in signature_query_clauses)
)
txn.execute(signature_sql, signature_query_params)
rows = self.cursor_to_dict(txn)

View file

@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict())
sql = (
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
@ -732,7 +730,7 @@ class EventsStore(
},
)
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
@ -1479,7 +1477,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()

View file

@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
if filter_id_response is not None:
return filter_id_response[0]
sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,))
max_id = txn.fetchone()[0]
if max_id is None:

View file

@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
if len(media_ids) == 0:
return
sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
return
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])

View file

@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit)
txn.execute(sql, args)
return (r[0:5] + (json.loads(r[5]),) for r in txn)
return list(r[0:5] + (json.loads(r[5]),) for r in txn)
return self.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn

View file

@ -19,7 +19,6 @@ import logging
import re
from six import iterkeys
from six.moves import range
from twisted.internet import defer
from twisted.internet.defer import Deferred
@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def f(txn):
sql = (
"SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
)
sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
txn.execute(sql, (user_id,))
return dict(txn)
@ -484,12 +481,8 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
Gets the localpart of the next generated user ID.
Generated user IDs are integers, and we aim for them to be as small as
we can. Unfortunately, it's possible some of them are already taken by
existing users, and there may be gaps in the already taken range. This
function returns the start of the first allocatable gap. This is to
avoid the case of ID 1000 being pre-allocated and starting at 1001 while
0-999 are available.
Generated user IDs are integers, so we find the largest integer user ID
already taken and return that plus one.
"""
def _find_next_generated_user_id(txn):
@ -499,15 +492,14 @@ class RegistrationWorkerStore(SQLBaseStore):
regex = re.compile(r"^@(\d+):")
found = set()
max_found = 0
for (user_id,) in txn:
match = regex.search(user_id)
if match:
found.add(int(match.group(1)))
for i in range(len(found) + 1):
if i not in found:
return i
max_found = max(int(match.group(1)), max_found)
return max_found + 1
return (
(

View file

@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def _get_max_topological_txn(self, txn, room_id):
txn.execute(
"SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
"SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
(room_id,),
)

View file

@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
def get_tag_content(txn, tag_ids):
sql = (
"SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
)
sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
results = []
for stream_id, user_id, room_id in tag_ids:
txn.execute(sql, (user_id, room_id))

View file

@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
),
(modname, name),
)

View file

@ -88,9 +88,12 @@ class PaginationConfig(object):
raise SynapseError(400, "Invalid request.")
def __repr__(self):
return (
"PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
) % (self.from_token, self.to_token, self.direction, self.limit)
return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
self.from_token,
self.to_token,
self.direction,
self.limit,
)
def get_source_config(self, source_name):
keyname = "%s_key" % source_name

View file

@ -25,7 +25,9 @@ from twisted.internet import defer
import synapse.rest.admin
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.handlers.pagination import PurgeStatus
from synapse.rest.client.v1 import login, profile, room
from synapse.util.stringutils import random_string
from tests import unittest
@ -910,6 +912,78 @@ class RoomMessageListTestCase(RoomBase):
return channel.json_body["chunk"]
def test_room_messages_purge(self):
store = self.hs.get_datastore()
pagination_handler = self.hs.get_pagination_handler()
# Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
)
# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
)
# Send a third event in the room to ensure we don't fall under any edge case
# due to our marker being the latest forward extremity in the room.
self.helper.send(self.room_id, "message 3")
# Check that we get the first and second message when querying /messages.
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
% (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
room_id=self.room_id,
token=second_token,
delete_local_events=True,
)
)
# Check that we only get the second message through /message now that the first
# has been purged.
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
% (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])
# Check that we get no event, but also no error, when querying /messages with
# the token that was pointing at the first event, because we don't have it
# anymore.
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
% (self.room_id, first_token, json.dumps({"types": [EventTypes.Message]})),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])
class RoomSearchTestCase(unittest.HomeserverTestCase):
servlets = [

View file

@ -203,6 +203,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
@unittest.override_config(
{
"public_baseurl": "https://test_server",
"enable_registration_captcha": True,
"user_consent": {
"version": "1",

View file

@ -379,6 +379,7 @@ class FakeTransport(object):
disconnecting = False
disconnected = False
connected = True
buffer = attr.ib(default=b"")
producer = attr.ib(default=None)
autoflush = attr.ib(default=True)
@ -402,6 +403,7 @@ class FakeTransport(object):
"FakeTransport: Delaying disconnect until buffer is flushed"
)
else:
self.connected = False
self.disconnected = True
def abortConnection(self):