Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2018-06-25 14:37:13 +01:00
commit 4c22c9b0b6
17 changed files with 244 additions and 47 deletions

View file

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
@ -39,10 +39,10 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting)
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
def deactivate_account(self, user_id, erase_data):
"""Deactivate a user's account
Args:
@ -92,6 +92,11 @@ class DeactivateAccountHandler(BaseHandler):
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
yield self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()

View file

@ -495,7 +495,20 @@ class FederationHandler(BaseHandler):
for e_id, key_to_eid in event_to_state_ids.iteritems()
}
erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)
def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)
if not state:
return event

View file

@ -20,7 +20,7 @@ import sys
from canonicaljson import encode_canonical_json
import six
from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@ -157,7 +157,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
reactor.callLater(24 * 3600, clear_purge)
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge

View file

@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@ -78,17 +78,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))
), reactor)
class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@ -98,9 +99,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
def __init__(self, conn):
def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@ -131,14 +133,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
self.timed_call = reactor.callLater(
self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)

View file

@ -15,7 +15,7 @@
# limitations under the License.
import logging
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
@ -220,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
self.timed_call = self.hs.get_reactor().callLater(
self.backoff_delay, self.on_timer
)
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break

View file

@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.user_erasure_store import UserErasureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):

View file

@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
from twisted.internet import reactor, defer
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
reactor.connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes

View file

@ -15,7 +15,7 @@
"""The server side of the replication stream.
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown

View file

@ -254,7 +254,9 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
yield self._deactivate_account_handler.deactivate_account(target_user_id)
yield self._deactivate_account_handler.deactivate_account(
target_user_id, False,
)
defer.returnValue((200, {}))

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,6 +16,7 @@
# limitations under the License.
import logging
from six.moves import http_client
from twisted.internet import defer
from synapse.api.auth import has_access_token
@ -186,13 +188,20 @@ class DeactivateAccountRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
body = parse_json_object_from_request(request)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
requester = yield self.auth.get_user_by_req(request)
# allow ASes to dectivate their own users
if requester.app_service:
yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string()
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
@ -200,7 +209,7 @@ class DeactivateAccountRestServlet(RestServlet):
requester, body, self.hs.get_ip_from_request(request),
)
yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(),
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))

View file

@ -20,6 +20,7 @@ import time
import logging
from synapse.storage.devices import DeviceStore
from synapse.storage.user_erasure_store import UserErasureStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
):
def __init__(self, db_conn, hs):

View file

@ -0,0 +1,21 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- a table of users who have requested that their details be erased
CREATE TABLE erased_users (
user_id TEXT NOT NULL
);
CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);

View file

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import operator
from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedList, cached
class UserErasureWorkerStore(SQLBaseStore):
@cached()
def is_user_erased(self, user_id):
"""
Check if the given user id has requested erasure
Args:
user_id (str): full user id to check
Returns:
Deferred[bool]: True if the user has requested erasure
"""
return self._simple_select_onecol(
table="erased_users",
keyvalues={"user_id": user_id},
retcol="1",
desc="is_user_erased",
).addCallback(operator.truth)
@cachedList(
cached_method_name="is_user_erased",
list_name="user_ids",
inlineCallbacks=True,
)
def are_users_erased(self, user_ids):
"""
Checks which users in a list have requested erasure
Args:
user_ids (iterable[str]): full user id to check
Returns:
Deferred[dict[str, bool]]:
for each user, whether the user has requested erasure.
"""
# this serves the dual purpose of (a) making sure we can do len and
# iterate it multiple times, and (b) avoiding duplicates.
user_ids = tuple(set(user_ids))
def _get_erased_users(txn):
txn.execute(
"SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
",".join("?" * len(user_ids))
),
user_ids,
)
return set(r[0] for r in txn)
erased_users = yield self.runInteraction(
"are_users_erased", _get_erased_users,
)
res = dict((u, u in erased_users) for u in user_ids)
defer.returnValue(res)
class UserErasureStore(UserErasureWorkerStore):
def mark_user_erased(self, user_id):
"""Indicate that user_id wishes their message history to be erased.
Args:
user_id (str): full user_id to be erased
"""
def f(txn):
# first check if they are already in the list
txn.execute(
"SELECT 1 FROM erased_users WHERE user_id = ?",
(user_id, )
)
if txn.fetchone():
return
# they are not already there: do the insert.
txn.execute(
"INSERT INTO erased_users (user_id) VALUES (?)",
(user_id, )
)
self._invalidate_cache_and_stream(
txn, self.is_user_erased, (user_id,)
)
return self.runInteraction("mark_user_erased", f)

View file

@ -34,6 +34,9 @@ def unwrapFirstError(failure):
class Clock(object):
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
Args:
reactor: The Twisted reactor to use.
"""
_reactor = attr.ib()

View file

@ -12,15 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import operator
from twisted.internet import defer
from synapse.api.constants import Membership, EventTypes
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn,
)
logger = logging.getLogger(__name__)
@ -95,16 +97,27 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if ignore_dict_content else []
)
erased_senders = yield store.are_users_erased((e.sender for e in events))
def allowed(event):
"""
Args:
event (synapse.events.EventBase): event to check
Returns:
None|EventBase:
None if the user cannot see this event at all
a redacted copy of the event if they can only see a redacted
version
the original event if they can see it as normal.
"""
if not event.is_state() and event.sender in ignore_list:
return False
return None
if event.event_id in always_include_ids:
return True
return event
state = event_id_to_state[event.event_id]
@ -118,10 +131,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if visibility not in VISIBILITY_PRIORITY:
visibility = "shared"
# if it was world_readable, it's easy: everyone can read it
if visibility == "world_readable":
return True
# Always allow history visibility events on boundaries. This is done
# by setting the effective visibility to the least restrictive
# of the old vs new.
@ -155,7 +164,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership == "leave" and (
prev_membership == "join" or prev_membership == "invite"
):
return True
return event
new_priority = MEMBERSHIP_PRIORITY.index(membership)
old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
@ -166,31 +175,55 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership is None:
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
# XXX why do we do this?
# https://github.com/matrix-org/synapse/issues/3350
if membership_event.event_id not in event_id_forgotten:
membership = membership_event.membership
# if the user was a member of the room at the time of the event,
# they can see it.
if membership == Membership.JOIN:
return True
return event
# otherwise, it depends on the room visibility.
if visibility == "joined":
# we weren't a member at the time of the event, so we can't
# see this event.
return False
return None
elif visibility == "invited":
# user can also see the event if they were *invited* at the time
# of the event.
return membership == Membership.INVITE
return (
event if membership == Membership.INVITE else None
)
else:
# visibility is shared: user can also see the event if they have
# become a member since the event
elif visibility == "shared" and is_peeking:
# if the visibility is shared, users cannot see the event unless
# they have *subequently* joined the room (or were members at the
# time, of course)
#
# XXX: if the user has subsequently joined and then left again,
# ideally we would share history up to the point they left. But
# we don't know when they left.
return not is_peeking
# we don't know when they left. We just treat it as though they
# never joined, and restrict access.
return None
defer.returnValue(list(filter(allowed, events)))
# the visibility is either shared or world_readable, and the user was
# not a member at the time. We allow it, provided the original sender
# has not requested their data to be erased, in which case, we return
# a redacted version.
if erased_senders[event.sender]:
return prune_event(event)
return event
# check each event: gives an iterable[None|EventBase]
filtered_events = itertools.imap(allowed, events)
# remove the None entries
filtered_events = filter(operator.truth, filtered_events)
# we turn it into a list before returning it.
defer.returnValue(list(filtered_events))