0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-15 22:42:23 +01:00

Merge branch 'develop' of github.com:matrix-org/synapse into erikj/speed_up_calculate_state_delta

This commit is contained in:
Erik Johnston 2018-07-24 11:32:13 +01:00
commit 97acd385a3
17 changed files with 415 additions and 315 deletions

1
changelog.d/3555.feature Normal file
View file

@ -0,0 +1 @@
Add support for client_reader to handle more APIs

1
changelog.d/3590.misc Normal file
View file

@ -0,0 +1 @@
Add some measure blocks to persist_events

1
changelog.d/3591.misc Normal file
View file

@ -0,0 +1 @@
Fix some random logcontext leaks.

View file

@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
following regular expressions:: following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
``synapse.app.user_dir`` ``synapse.app.user_dir``
~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -739,3 +739,37 @@ class Auth(object):
) )
return query_params[0] return query_params[0]
@defer.inlineCallbacks
def check_in_room_or_world_readable(self, room_id, user_id):
"""Checks that the user is or was in the room or the room is world
readable. If it isn't then an exception is raised.
Returns:
Deferred[tuple[str, str|None]]: Resolves to the current membership of
the user in the room and the membership event ID of the user. If
the user is not in the room and never has been, then
`(Membership.JOIN, None)` is returned.
"""
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)

View file

@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
PublicRoomListRestServlet,
RoomEventContextServlet,
RoomMemberListRestServlet,
RoomStateRestServlet,
)
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
PublicRoomListRestServlet(self).register(resource) PublicRoomListRestServlet(self).register(resource)
RoomMemberListRestServlet(self).register(resource)
JoinedRoomMemberListRestServlet(self).register(resource)
RoomStateRestServlet(self).register(resource)
RoomEventContextServlet(self).register(resource)
resources.update({ resources.update({
"/_matrix/client/r0": resource, "/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource, "/_matrix/client/unstable": resource,

View file

@ -17,9 +17,7 @@ from .admin import AdminHandler
from .directory import DirectoryHandler from .directory import DirectoryHandler
from .federation import FederationHandler from .federation import FederationHandler
from .identity import IdentityHandler from .identity import IdentityHandler
from .message import MessageHandler
from .register import RegistrationHandler from .register import RegistrationHandler
from .room import RoomContextHandler
from .search import SearchHandler from .search import SearchHandler
@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs): def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs) self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs) self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs) self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs) self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs) self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs) self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key) yield self._check_user_exists(event.state_key)
if not self.started_scheduler: if not self.started_scheduler:
self.scheduler.start().addErrback(log_failure) def start_scheduler():
return self.scheduler.start().addErrback(log_failure)
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True self.started_scheduler = True
# Fork off pushes to these services # Fork off pushes to these services

View file

@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try: try:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_end_token = now_token.room_key room_end_token = now_token.room_key
deferred_room_state = self.state_handler.get_current_state( deferred_room_state = run_in_background(
event.room_id self.state_handler.get_current_state,
event.room_id,
) )
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,) room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = self.store.get_state_for_events( deferred_room_state = run_in_background(
[event.event_id], None self.store.get_state_for_events,
[event.event_id], None,
) )
deferred_room_state.addCallback( deferred_room_state.addCallback(
lambda states: states[event.event_id] lambda states: states[event.event_id]
@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
receipts = [] receipts = []
defer.returnValue(receipts) defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults( presence, receipts, (messages, token) = yield make_deferred_yieldable(
[ defer.gatherResults(
run_in_background(get_presence), [
run_in_background(get_receipts), run_in_background(get_presence),
run_in_background( run_in_background(get_receipts),
self.store.get_recent_events_for_room, run_in_background(
room_id, self.store.get_recent_events_for_room,
limit=limit, room_id,
end_token=now_token.room_key, limit=limit,
) end_token=now_token.room_key,
], )
consumeErrors=True, ],
).addErrback(unwrapFirstError) consumeErrors=True,
).addErrback(unwrapFirstError),
)
messages = yield filter_events_for_client( messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking, self.store, user_id, messages, is_peeking=is_peeking,

View file

@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import succeed from twisted.internet.defer import succeed
from twisted.python.failure import Failure
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer, ReadWriteLock from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler from ._base import BaseHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PurgeStatus(object): class MessageHandler(object):
"""Object tracking the status of a purge request """Contains some read only APIs to get state about a room
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
""" """
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
def __init__(self):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
return {
"status": PurgeStatus.STATUS_TEXT[self.status]
}
class MessageHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(MessageHandler, self).__init__(hs) self.auth = hs.get_auth()
self.hs = hs
self.state = hs.get_state_handler()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.pagination_lock = ReadWriteLock() self.store = hs.get_datastore()
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
str: unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400,
"History purge already in progress for %s" % (room_id, ),
)
purge_id = random_string(16)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
Args:
purge_id (str): purge_id returned by start_purge_history
Returns:
PurgeStatus|None
"""
return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
requester (Requester): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token_for_room(
room_id=room_id
)
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
if not events:
defer.returnValue({
"chunk": [],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
})
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
self.store,
user_id,
events,
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
defer.returnValue(chunk)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None, def get_room_data(self, user_id=None, room_id=None,
@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
Raises: Raises:
SynapseError if something went wrong. SynapseError if something went wrong.
""" """
membership, membership_event_id = yield self._check_in_room_or_world_readable( membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id room_id, user_id
) )
if membership == Membership.JOIN: if membership == Membership.JOIN:
data = yield self.state_handler.get_current_state( data = yield self.state.get_current_state(
room_id, event_type, state_key room_id, event_type, state_key
) )
elif membership == Membership.LEAVE: elif membership == Membership.LEAVE:
@ -303,31 +81,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(data) defer.returnValue(data)
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
return
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False): def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is """Retrieve all state events for a given room. If the user is
@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
Returns: Returns:
A list of dicts representing state events. [{}, {}, {}] A list of dicts representing state events. [{}, {}, {}]
""" """
membership, membership_event_id = yield self._check_in_room_or_world_readable( membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id room_id, user_id
) )
if membership == Membership.JOIN: if membership == Membership.JOIN:
room_state = yield self.state_handler.get_current_state(room_id) room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE: elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events( room_state = yield self.store.get_state_for_events(
[membership_event_id], None [membership_event_id], None
@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service: if not requester.app_service:
# We check AS auth after fetching the room membership, as it # We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway. # requires us to pull out all joined members anyway.
membership, _ = yield self._check_in_room_or_world_readable( membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id room_id, user_id
) )
if membership != Membership.JOIN: if membership != Membership.JOIN:

View file

@ -0,0 +1,265 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2017 - 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
from synapse.util.async import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
class PurgeStatus(object):
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
def __init__(self):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
return {
"status": PurgeStatus.STATUS_TEXT[self.status]
}
class PaginationHandler(object):
"""Handles pagination and purge history requests.
These are in the same handler due to the fact we need to block clients
paginating during a purge.
"""
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
str: unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400,
"History purge already in progress for %s" % (room_id, ),
)
purge_id = random_string(16)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
Args:
purge_id (str): purge_id returned by start_purge_history
Returns:
PurgeStatus|None
"""
return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
requester (Requester): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token_for_room(
room_id=room_id
)
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
if not events:
defer.returnValue({
"chunk": [],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
})
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
self.store,
user_id,
events,
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
defer.returnValue(chunk)

View file

@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
) )
class RoomContextHandler(BaseHandler): class RoomContextHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit): def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event """Retrieves events, pagination tokens and state around a given event

View file

@ -244,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer) hs (synapse.server.HomeServer)
""" """
super(PurgeHistoryRestServlet, self).__init__(hs) super(PurgeHistoryRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore() self.store = hs.get_datastore()
@defer.inlineCallbacks @defer.inlineCallbacks
@ -319,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON, errcode=Codes.BAD_JSON,
) )
purge_id = yield self.handlers.message_handler.start_purge_history( purge_id = yield self.pagination_handler.start_purge_history(
room_id, token, room_id, token,
delete_local_events=delete_local_events, delete_local_events=delete_local_events,
) )
@ -341,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer) hs (synapse.server.HomeServer)
""" """
super(PurgeHistoryStatusRestServlet, self).__init__(hs) super(PurgeHistoryStatusRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, purge_id): def on_GET(self, request, purge_id):
@ -351,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin: if not is_admin:
raise AuthError(403, "You are not a server admin") raise AuthError(403, "You are not a server admin")
purge_status = self.handlers.message_handler.get_purge_status(purge_id) purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None: if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id) raise NotFoundError("purge id '%s' not found" % purge_id)

View file

@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers() self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler() self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler() self.room_member_handler = hs.get_room_member_handler()
self.message_handler = hs.get_message_handler()
def register(self, http_server): def register(self, http_server):
# /room/$roomid/state/$eventtype # /room/$roomid/state/$eventtype
@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content", format = parse_string(request, "format", default="content",
allowed_values=["content", "event"]) allowed_values=["content", "event"])
msg_handler = self.handlers.message_handler msg_handler = self.message_handler
data = yield msg_handler.get_room_data( data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
room_id=room_id, room_id=room_id,
@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs) super(RoomMemberListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens) # TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
handler = self.handlers.message_handler events = yield self.message_handler.get_state_events(
events = yield handler.get_state_events(
room_id=room_id, room_id=room_id,
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
) )
@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs) super(JoinedRoomMemberListRestServlet, self).__init__(hs)
self.message_handler = hs.get_handlers().message_handler self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs) super(RoomMessageListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json)) event_filter = Filter(json.loads(filter_json))
else: else:
event_filter = None event_filter = None
handler = self.handlers.message_handler msgs = yield self.pagination_handler.get_messages(
msgs = yield handler.get_messages(
room_id=room_id, room_id=room_id,
requester=requester, requester=requester,
pagin_config=pagination_config, pagin_config=pagination_config,
@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs) super(RoomStateRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester = yield self.auth.get_user_by_req(request, allow_guest=True)
handler = self.handlers.message_handler
# Get all the current state for this room # Get all the current state for this room
events = yield handler.get_state_events( events = yield self.message_handler.get_state_events(
room_id=room_id, room_id=room_id,
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
is_guest=requester.is_guest, is_guest=requester.is_guest,
@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs) super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.handlers = hs.get_handlers() self.room_context_handler = hs.get_room_context_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id, event_id): def on_GET(self, request, room_id, event_id):
@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10) limit = parse_integer(request, "limit", default=10)
results = yield self.handlers.room_context_handler.get_event_context( results = yield self.room_context_handler.get_event_context(
requester.user, requester.user,
room_id, room_id,
event_id, event_id,

View file

@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.room import RoomCreationHandler from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@ -165,6 +166,9 @@ class HomeServer(object):
'federation_registry', 'federation_registry',
'server_notices_manager', 'server_notices_manager',
'server_notices_sender', 'server_notices_sender',
'message_handler',
'pagination_handler',
'room_context_handler',
] ]
def __init__(self, hostname, reactor=None, **kwargs): def __init__(self, hostname, reactor=None, **kwargs):
@ -431,6 +435,15 @@ class HomeServer(object):
return WorkerServerNoticesSender(self) return WorkerServerNoticesSender(self)
return ServerNoticesSender(self) return ServerNoticesSender(self)
def build_message_handler(self):
return MessageHandler(self)
def build_pagination_handler(self):
return PaginationHandler(self)
def build_room_context_handler(self):
return RoomContextHandler(self)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

View file

@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred. # callbacks on the deferred.
try: try:
ret = yield per_item_callback(item) ret = yield per_item_callback(item)
item.deferred.callback(ret) with PreserveLoggingContext():
item.deferred.callback(ret)
except Exception: except Exception:
item.deferred.errback() item.deferred.errback()
finally: finally:
@ -417,18 +418,28 @@ class EventsStore(EventsWorkerStore):
logger.info( logger.info(
"Calculating state delta for room %s", room_id, "Calculating state delta for room %s", room_id,
) )
current_state = yield self._get_new_state_after_events(
room_id, with Measure(
ev_ctx_rm, self._clock,
latest_event_ids, "persist_events.get_new_state_after_events",
new_latest_event_ids, ):
) current_state = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
)
if current_state is not None: if current_state is not None:
current_state_for_room[room_id] = current_state current_state_for_room[room_id] = current_state
delta = yield self._calculate_state_delta( with Measure(
room_id, current_state, self._clock,
) "persist_events.calculate_state_delta",
state_delta_for_room[room_id] = delta ):
delta = yield self._calculate_state_delta(
room_id, current_state,
)
state_delta_for_room[room_id] = delta
yield self.runInteraction( yield self.runInteraction(
"persist_events", "persist_events",

View file

@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
) )
if newly_inserted: if newly_inserted:
self.runInteraction( yield self.runInteraction(
"add_pusher", "add_pusher",
self._invalidate_cache_and_stream, self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,) self.get_if_user_has_pusher, (user_id,)