Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
aa1bf10b91
1
changelog.d/3350.misc
Normal file
1
changelog.d/3350.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Remove redundant checks on who_forgot_in_room
|
1
changelog.d/3555.feature
Normal file
1
changelog.d/3555.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for client_reader to handle more APIs
|
1
changelog.d/3586.misc
Normal file
1
changelog.d/3586.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Fixes and optimisations for resolve_state_groups
|
1
changelog.d/3587.misc
Normal file
1
changelog.d/3587.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve logging for exceptions when handling PDUs
|
1
changelog.d/3590.misc
Normal file
1
changelog.d/3590.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add some measure blocks to persist_events
|
1
changelog.d/3591.misc
Normal file
1
changelog.d/3591.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix some random logcontext leaks.
|
1
changelog.d/3592.misc
Normal file
1
changelog.d/3592.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Speed up calculating state deltas in persist_event loop
|
1
changelog.d/3595.misc
Normal file
1
changelog.d/3595.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Attempt to reduce amount of state pulled out of DB during persist_events
|
|
@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
|
|||
following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
|
||||
|
||||
``synapse.app.user_dir``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
|
@ -739,3 +739,37 @@ class Auth(object):
|
|||
)
|
||||
|
||||
return query_params[0]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_in_room_or_world_readable(self, room_id, user_id):
|
||||
"""Checks that the user is or was in the room or the room is world
|
||||
readable. If it isn't then an exception is raised.
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[str, str|None]]: Resolves to the current membership of
|
||||
the user in the room and the membership event ID of the user. If
|
||||
the user is not in the room and never has been, then
|
||||
`(Membership.JOIN, None)` is returned.
|
||||
"""
|
||||
|
||||
try:
|
||||
# check_user_was_in_room will return the most recent membership
|
||||
# event for the user if:
|
||||
# * The user is a non-guest user, and was ever in the room
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.check_user_was_in_room(room_id, user_id)
|
||||
defer.returnValue((member_event.membership, member_event.event_id))
|
||||
except AuthError:
|
||||
visibility = yield self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
if (
|
||||
visibility and
|
||||
visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
defer.returnValue((Membership.JOIN, None))
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
|
|
@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
|
|||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinedRoomMemberListRestServlet,
|
||||
PublicRoomListRestServlet,
|
||||
RoomEventContextServlet,
|
||||
RoomMemberListRestServlet,
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
|
@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer):
|
|||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
|
||||
PublicRoomListRestServlet(self).register(resource)
|
||||
RoomMemberListRestServlet(self).register(resource)
|
||||
JoinedRoomMemberListRestServlet(self).register(resource)
|
||||
RoomStateRestServlet(self).register(resource)
|
||||
RoomEventContextServlet(self).register(resource)
|
||||
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
|
|
|
@ -249,7 +249,7 @@ class EventContext(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def update_state(self, state_group, prev_state_ids, current_state_ids,
|
||||
delta_ids):
|
||||
prev_group, delta_ids):
|
||||
"""Replace the state in the context
|
||||
"""
|
||||
|
||||
|
@ -260,6 +260,7 @@ class EventContext(object):
|
|||
|
||||
self.state_group = state_group
|
||||
self._prev_state_ids = prev_state_ids
|
||||
self.prev_group = prev_group
|
||||
self._current_state_ids = current_state_ids
|
||||
self.delta_ids = delta_ids
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ from prometheus_client import Counter
|
|||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
|
||||
|
@ -186,8 +187,12 @@ class FederationServer(FederationBase):
|
|||
logger.warn("Error handling PDU %s: %s", event_id, e)
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
except Exception as e:
|
||||
f = failure.Failure()
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
logger.exception("Failed to handle PDU %s", event_id)
|
||||
logger.error(
|
||||
"Failed to handle PDU %s: %s",
|
||||
event_id, f.getTraceback().rstrip(),
|
||||
)
|
||||
|
||||
yield async.concurrently_execute(
|
||||
process_pdus_for_room, pdus_by_room.keys(),
|
||||
|
@ -203,8 +208,8 @@ class FederationServer(FederationBase):
|
|||
)
|
||||
|
||||
pdu_failures = getattr(transaction, "pdu_failures", [])
|
||||
for failure in pdu_failures:
|
||||
logger.info("Got failure %r", failure)
|
||||
for fail in pdu_failures:
|
||||
logger.info("Got failure %r", fail)
|
||||
|
||||
response = {
|
||||
"pdus": pdu_results,
|
||||
|
|
|
@ -17,9 +17,7 @@ from .admin import AdminHandler
|
|||
from .directory import DirectoryHandler
|
||||
from .federation import FederationHandler
|
||||
from .identity import IdentityHandler
|
||||
from .message import MessageHandler
|
||||
from .register import RegistrationHandler
|
||||
from .room import RoomContextHandler
|
||||
from .search import SearchHandler
|
||||
|
||||
|
||||
|
@ -44,10 +42,8 @@ class Handlers(object):
|
|||
|
||||
def __init__(self, hs):
|
||||
self.registration_handler = RegistrationHandler(hs)
|
||||
self.message_handler = MessageHandler(hs)
|
||||
self.federation_handler = FederationHandler(hs)
|
||||
self.directory_handler = DirectoryHandler(hs)
|
||||
self.admin_handler = AdminHandler(hs)
|
||||
self.identity_handler = IdentityHandler(hs)
|
||||
self.search_handler = SearchHandler(hs)
|
||||
self.room_context_handler = RoomContextHandler(hs)
|
||||
|
|
|
@ -23,6 +23,7 @@ from twisted.internet import defer
|
|||
|
||||
import synapse
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
|
@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
|
|||
yield self._check_user_exists(event.state_key)
|
||||
|
||||
if not self.started_scheduler:
|
||||
self.scheduler.start().addErrback(log_failure)
|
||||
def start_scheduler():
|
||||
return self.scheduler.start().addErrback(log_failure)
|
||||
run_as_background_process("as_scheduler", start_scheduler)
|
||||
self.started_scheduler = True
|
||||
|
||||
# Fork off pushes to these services
|
||||
|
|
|
@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
current_state_ids.update(state_updates)
|
||||
|
||||
if context.delta_ids is not None:
|
||||
delta_ids = dict(context.delta_ids)
|
||||
delta_ids.update(state_updates)
|
||||
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
prev_state_ids = dict(prev_state_ids)
|
||||
|
||||
|
@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler):
|
|||
k: a.event_id for k, a in iteritems(auth_events)
|
||||
})
|
||||
|
||||
# create a new state group as a delta from the existing one.
|
||||
prev_group = context.state_group
|
||||
state_group = yield self.store.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=context.prev_group,
|
||||
delta_ids=delta_ids,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
current_state_ids=current_state_ids,
|
||||
)
|
||||
|
||||
|
@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler):
|
|||
state_group=state_group,
|
||||
current_state_ids=current_state_ids,
|
||||
prev_state_ids=prev_state_ids,
|
||||
delta_ids=delta_ids,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
|
|||
try:
|
||||
if event.membership == Membership.JOIN:
|
||||
room_end_token = now_token.room_key
|
||||
deferred_room_state = self.state_handler.get_current_state(
|
||||
event.room_id
|
||||
deferred_room_state = run_in_background(
|
||||
self.state_handler.get_current_state,
|
||||
event.room_id,
|
||||
)
|
||||
elif event.membership == Membership.LEAVE:
|
||||
room_end_token = "s%d" % (event.stream_ordering,)
|
||||
deferred_room_state = self.store.get_state_for_events(
|
||||
[event.event_id], None
|
||||
deferred_room_state = run_in_background(
|
||||
self.store.get_state_for_events,
|
||||
[event.event_id], None,
|
||||
)
|
||||
deferred_room_state.addCallback(
|
||||
lambda states: states[event.event_id]
|
||||
|
@ -388,19 +390,21 @@ class InitialSyncHandler(BaseHandler):
|
|||
receipts = []
|
||||
defer.returnValue(receipts)
|
||||
|
||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_presence),
|
||||
run_in_background(get_receipts),
|
||||
run_in_background(
|
||||
self.store.get_recent_events_for_room,
|
||||
room_id,
|
||||
limit=limit,
|
||||
end_token=now_token.room_key,
|
||||
)
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
presence, receipts, (messages, token) = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_presence),
|
||||
run_in_background(get_receipts),
|
||||
run_in_background(
|
||||
self.store.get_recent_events_for_room,
|
||||
room_id,
|
||||
limit=limit,
|
||||
end_token=now_token.room_key,
|
||||
)
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError),
|
||||
)
|
||||
|
||||
messages = yield filter_events_for_client(
|
||||
self.store, user_id, messages, is_peeking=is_peeking,
|
||||
|
|
|
@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
|
|||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import succeed
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||
|
@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
|
|||
from synapse.events.utils import serialize_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.replication.http.send_event import send_event_to_master
|
||||
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
||||
from synapse.util.async import Linearizer, ReadWriteLock
|
||||
from synapse.types import RoomAlias, UserID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PurgeStatus(object):
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
|
||||
Attributes:
|
||||
status (int): Tracks whether this request has completed. One of
|
||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||
class MessageHandler(object):
|
||||
"""Contains some read only APIs to get state about a room
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.status = PurgeStatus.STATUS_ACTIVE
|
||||
|
||||
def asdict(self):
|
||||
return {
|
||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||
}
|
||||
|
||||
|
||||
class MessageHandler(BaseHandler):
|
||||
|
||||
def __init__(self, hs):
|
||||
super(MessageHandler, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth = hs.get_auth()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id = {}
|
||||
|
||||
def start_purge_history(self, room_id, token,
|
||||
delete_local_events=False):
|
||||
"""Start off a history purge on a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The room to purge from
|
||||
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
str: unique ID for this purge transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"History purge already in progress for %s" % (room_id, ),
|
||||
)
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
# we log the purge_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_in_background(
|
||||
self._purge_history,
|
||||
purge_id, room_id, token, delete_local_events,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _purge_history(self, purge_id, room_id, token,
|
||||
delete_local_events):
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
purge_id (str): The id for this purge
|
||||
room_id (str): The room to purge from
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
with (yield self.pagination_lock.write(room_id)):
|
||||
yield self.store.purge_history(
|
||||
room_id, token, delete_local_events,
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge():
|
||||
del self._purges_by_id[purge_id]
|
||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||
|
||||
def get_purge_status(self, purge_id):
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id (str): purge_id returned by start_purge_history
|
||||
|
||||
Returns:
|
||||
PurgeStatus|None
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True, event_filter=None):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
requester (Requester): The user requesting messages.
|
||||
room_id (str): The room they want messages from.
|
||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
event_filter (Filter): Filter to apply to results or None
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if pagin_config.from_token:
|
||||
room_token = pagin_config.from_token.room_key
|
||||
else:
|
||||
pagin_config.from_token = (
|
||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||
room_id=room_id
|
||||
)
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
room_token = RoomStreamToken.parse(room_token)
|
||||
|
||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", str(room_token)
|
||||
)
|
||||
|
||||
source_config = pagin_config.get_source_config("room")
|
||||
|
||||
with (yield self.pagination_lock.read(room_id)):
|
||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if source_config.direction == 'b':
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
max_topo = room_token.topological
|
||||
else:
|
||||
max_topo = yield self.store.get_max_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < max_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
direction=source_config.direction,
|
||||
limit=source_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", next_key
|
||||
)
|
||||
|
||||
if not events:
|
||||
defer.returnValue({
|
||||
"chunk": [],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
if event_filter:
|
||||
events = event_filter.filter(events)
|
||||
|
||||
events = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
events,
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
"chunk": [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in events
|
||||
],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
}
|
||||
|
||||
defer.returnValue(chunk)
|
||||
self.state = hs.get_state_handler()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_data(self, user_id=None, room_id=None,
|
||||
|
@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
|
|||
Raises:
|
||||
SynapseError if something went wrong.
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
data = yield self.state_handler.get_current_state(
|
||||
data = yield self.state.get_current_state(
|
||||
room_id, event_type, state_key
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
|
@ -303,31 +81,6 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_in_room_or_world_readable(self, room_id, user_id):
|
||||
try:
|
||||
# check_user_was_in_room will return the most recent membership
|
||||
# event for the user if:
|
||||
# * The user is a non-guest user, and was ever in the room
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
||||
defer.returnValue((member_event.membership, member_event.event_id))
|
||||
return
|
||||
except AuthError:
|
||||
visibility = yield self.state_handler.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
if (
|
||||
visibility and
|
||||
visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
defer.returnValue((Membership.JOIN, None))
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_events(self, user_id, room_id, is_guest=False):
|
||||
"""Retrieve all state events for a given room. If the user is
|
||||
|
@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
|
|||
Returns:
|
||||
A list of dicts representing state events. [{}, {}, {}]
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
room_state = yield self.state_handler.get_current_state(room_id)
|
||||
room_state = yield self.state.get_current_state(room_id)
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[membership_event_id], None
|
||||
|
@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
|
|||
if not requester.app_service:
|
||||
# We check AS auth after fetching the room membership, as it
|
||||
# requires us to pull out all joined members anyway.
|
||||
membership, _ = yield self._check_in_room_or_world_readable(
|
||||
membership, _ = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
if membership != Membership.JOIN:
|
||||
|
|
265
synapse/handlers/pagination.py
Normal file
265
synapse/handlers/pagination.py
Normal file
|
@ -0,0 +1,265 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||
# Copyright 2017 - 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async import ReadWriteLock
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PurgeStatus(object):
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
|
||||
Attributes:
|
||||
status (int): Tracks whether this request has completed. One of
|
||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.status = PurgeStatus.STATUS_ACTIVE
|
||||
|
||||
def asdict(self):
|
||||
return {
|
||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||
}
|
||||
|
||||
|
||||
class PaginationHandler(object):
|
||||
"""Handles pagination and purge history requests.
|
||||
|
||||
These are in the same handler due to the fact we need to block clients
|
||||
paginating during a purge.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id = {}
|
||||
|
||||
def start_purge_history(self, room_id, token,
|
||||
delete_local_events=False):
|
||||
"""Start off a history purge on a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The room to purge from
|
||||
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
str: unique ID for this purge transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"History purge already in progress for %s" % (room_id, ),
|
||||
)
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
# we log the purge_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_in_background(
|
||||
self._purge_history,
|
||||
purge_id, room_id, token, delete_local_events,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _purge_history(self, purge_id, room_id, token,
|
||||
delete_local_events):
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
purge_id (str): The id for this purge
|
||||
room_id (str): The room to purge from
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
with (yield self.pagination_lock.write(room_id)):
|
||||
yield self.store.purge_history(
|
||||
room_id, token, delete_local_events,
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge():
|
||||
del self._purges_by_id[purge_id]
|
||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||
|
||||
def get_purge_status(self, purge_id):
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id (str): purge_id returned by start_purge_history
|
||||
|
||||
Returns:
|
||||
PurgeStatus|None
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True, event_filter=None):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
requester (Requester): The user requesting messages.
|
||||
room_id (str): The room they want messages from.
|
||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
event_filter (Filter): Filter to apply to results or None
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if pagin_config.from_token:
|
||||
room_token = pagin_config.from_token.room_key
|
||||
else:
|
||||
pagin_config.from_token = (
|
||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||
room_id=room_id
|
||||
)
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
room_token = RoomStreamToken.parse(room_token)
|
||||
|
||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", str(room_token)
|
||||
)
|
||||
|
||||
source_config = pagin_config.get_source_config("room")
|
||||
|
||||
with (yield self.pagination_lock.read(room_id)):
|
||||
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if source_config.direction == 'b':
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
max_topo = room_token.topological
|
||||
else:
|
||||
max_topo = yield self.store.get_max_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < max_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
direction=source_config.direction,
|
||||
limit=source_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", next_key
|
||||
)
|
||||
|
||||
if not events:
|
||||
defer.returnValue({
|
||||
"chunk": [],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
if event_filter:
|
||||
events = event_filter.filter(events)
|
||||
|
||||
events = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
events,
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
"chunk": [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in events
|
||||
],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
}
|
||||
|
||||
defer.returnValue(chunk)
|
|
@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
|
||||
class RoomContextHandler(BaseHandler):
|
||||
class RoomContextHandler(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_event_context(self, user, room_id, event_id, limit):
|
||||
"""Retrieves events, pagination tokens and state around a given event
|
||||
|
|
|
@ -244,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
|||
hs (synapse.server.HomeServer)
|
||||
"""
|
||||
super(PurgeHistoryRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -319,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
|||
errcode=Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge_id = yield self.handlers.message_handler.start_purge_history(
|
||||
purge_id = yield self.pagination_handler.start_purge_history(
|
||||
room_id, token,
|
||||
delete_local_events=delete_local_events,
|
||||
)
|
||||
|
@ -341,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
|
|||
hs (synapse.server.HomeServer)
|
||||
"""
|
||||
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, purge_id):
|
||||
|
@ -351,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
|
|||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
purge_status = self.handlers.message_handler.get_purge_status(purge_id)
|
||||
purge_status = self.pagination_handler.get_purge_status(purge_id)
|
||||
if purge_status is None:
|
||||
raise NotFoundError("purge id '%s' not found" % purge_id)
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
|||
self.handlers = hs.get_handlers()
|
||||
self.event_creation_hander = hs.get_event_creation_handler()
|
||||
self.room_member_handler = hs.get_room_member_handler()
|
||||
self.message_handler = hs.get_message_handler()
|
||||
|
||||
def register(self, http_server):
|
||||
# /room/$roomid/state/$eventtype
|
||||
|
@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
|||
format = parse_string(request, "format", default="content",
|
||||
allowed_values=["content", "event"])
|
||||
|
||||
msg_handler = self.handlers.message_handler
|
||||
msg_handler = self.message_handler
|
||||
data = yield msg_handler.get_room_data(
|
||||
user_id=requester.user.to_string(),
|
||||
room_id=room_id,
|
||||
|
@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(RoomMemberListRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
self.message_handler = hs.get_message_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
# TODO support Pagination stream API (limit/tokens)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
handler = self.handlers.message_handler
|
||||
events = yield handler.get_state_events(
|
||||
events = yield self.message_handler.get_state_events(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
)
|
||||
|
@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
|
||||
self.message_handler = hs.get_handlers().message_handler
|
||||
self.message_handler = hs.get_message_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
|
@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(RoomMessageListRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
|
@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||
event_filter = Filter(json.loads(filter_json))
|
||||
else:
|
||||
event_filter = None
|
||||
handler = self.handlers.message_handler
|
||||
msgs = yield handler.get_messages(
|
||||
msgs = yield self.pagination_handler.get_messages(
|
||||
room_id=room_id,
|
||||
requester=requester,
|
||||
pagin_config=pagination_config,
|
||||
|
@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(RoomStateRestServlet, self).__init__(hs)
|
||||
self.handlers = hs.get_handlers()
|
||||
self.message_handler = hs.get_message_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
handler = self.handlers.message_handler
|
||||
# Get all the current state for this room
|
||||
events = yield handler.get_state_events(
|
||||
events = yield self.message_handler.get_state_events(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
is_guest=requester.is_guest,
|
||||
|
@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
|
|||
def __init__(self, hs):
|
||||
super(RoomEventContextServlet, self).__init__(hs)
|
||||
self.clock = hs.get_clock()
|
||||
self.handlers = hs.get_handlers()
|
||||
self.room_context_handler = hs.get_room_context_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, room_id, event_id):
|
||||
|
@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
|
|||
|
||||
limit = parse_integer(request, "limit", default=10)
|
||||
|
||||
results = yield self.handlers.room_context_handler.get_event_context(
|
||||
results = yield self.room_context_handler.get_event_context(
|
||||
requester.user,
|
||||
room_id,
|
||||
event_id,
|
||||
|
|
|
@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
|
|||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||
from synapse.handlers.groups_local import GroupsLocalHandler
|
||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||
from synapse.handlers.message import EventCreationHandler
|
||||
from synapse.handlers.message import EventCreationHandler, MessageHandler
|
||||
from synapse.handlers.pagination import PaginationHandler
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.handlers.profile import ProfileHandler
|
||||
from synapse.handlers.read_marker import ReadMarkerHandler
|
||||
from synapse.handlers.receipts import ReceiptsHandler
|
||||
from synapse.handlers.room import RoomCreationHandler
|
||||
from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
|
||||
from synapse.handlers.room_list import RoomListHandler
|
||||
from synapse.handlers.room_member import RoomMemberMasterHandler
|
||||
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
||||
|
@ -165,6 +166,9 @@ class HomeServer(object):
|
|||
'federation_registry',
|
||||
'server_notices_manager',
|
||||
'server_notices_sender',
|
||||
'message_handler',
|
||||
'pagination_handler',
|
||||
'room_context_handler',
|
||||
]
|
||||
|
||||
def __init__(self, hostname, reactor=None, **kwargs):
|
||||
|
@ -431,6 +435,15 @@ class HomeServer(object):
|
|||
return WorkerServerNoticesSender(self)
|
||||
return ServerNoticesSender(self)
|
||||
|
||||
def build_message_handler(self):
|
||||
return MessageHandler(self)
|
||||
|
||||
def build_pagination_handler(self):
|
||||
return PaginationHandler(self)
|
||||
|
||||
def build_room_context_handler(self):
|
||||
return RoomContextHandler(self)
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
|
|
143
synapse/state.py
143
synapse/state.py
|
@ -471,69 +471,39 @@ class StateResolutionHandler(object):
|
|||
"Resolving state for %s with %d groups", room_id, len(state_groups_ids)
|
||||
)
|
||||
|
||||
# build a map from state key to the event_ids which set that state.
|
||||
# dict[(str, str), set[str])
|
||||
state = {}
|
||||
# start by assuming we won't have any conflicted state, and build up the new
|
||||
# state map by iterating through the state groups. If we discover a conflict,
|
||||
# we give up and instead use `resolve_events_with_factory`.
|
||||
#
|
||||
# XXX: is this actually worthwhile, or should we just let
|
||||
# resolve_events_with_factory do it?
|
||||
new_state = {}
|
||||
conflicted_state = False
|
||||
for st in itervalues(state_groups_ids):
|
||||
for key, e_id in iteritems(st):
|
||||
state.setdefault(key, set()).add(e_id)
|
||||
|
||||
# build a map from state key to the event_ids which set that state,
|
||||
# including only those where there are state keys in conflict.
|
||||
conflicted_state = {
|
||||
k: list(v)
|
||||
for k, v in iteritems(state)
|
||||
if len(v) > 1
|
||||
}
|
||||
if key in new_state:
|
||||
conflicted_state = True
|
||||
break
|
||||
new_state[key] = e_id
|
||||
if conflicted_state:
|
||||
break
|
||||
|
||||
if conflicted_state:
|
||||
logger.info("Resolving conflicted state for %r", room_id)
|
||||
with Measure(self.clock, "state._resolve_events"):
|
||||
new_state = yield resolve_events_with_factory(
|
||||
list(state_groups_ids.values()),
|
||||
list(itervalues(state_groups_ids)),
|
||||
event_map=event_map,
|
||||
state_map_factory=state_map_factory,
|
||||
)
|
||||
else:
|
||||
new_state = {
|
||||
key: e_ids.pop() for key, e_ids in iteritems(state)
|
||||
}
|
||||
|
||||
# if the new state matches any of the input state groups, we can
|
||||
# use that state group again. Otherwise we will generate a state_id
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
|
||||
with Measure(self.clock, "state.create_group_ids"):
|
||||
# if the new state matches any of the input state groups, we can
|
||||
# use that state group again. Otherwise we will generate a state_id
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
state_group = None
|
||||
new_state_event_ids = frozenset(itervalues(new_state))
|
||||
for sg, events in iteritems(state_groups_ids):
|
||||
if new_state_event_ids == frozenset(e_id for e_id in events):
|
||||
state_group = sg
|
||||
break
|
||||
|
||||
# TODO: We want to create a state group for this set of events, to
|
||||
# increase cache hits, but we need to make sure that it doesn't
|
||||
# end up as a prev_group without being added to the database
|
||||
|
||||
prev_group = None
|
||||
delta_ids = None
|
||||
for old_group, old_ids in iteritems(state_groups_ids):
|
||||
if not set(new_state) - set(old_ids):
|
||||
n_delta_ids = {
|
||||
k: v
|
||||
for k, v in iteritems(new_state)
|
||||
if old_ids.get(k) != v
|
||||
}
|
||||
if not delta_ids or len(n_delta_ids) < len(delta_ids):
|
||||
prev_group = old_group
|
||||
delta_ids = n_delta_ids
|
||||
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=state_group,
|
||||
prev_group=prev_group,
|
||||
delta_ids=delta_ids,
|
||||
)
|
||||
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
||||
|
||||
if self._state_cache is not None:
|
||||
self._state_cache[group_names] = cache
|
||||
|
@ -541,6 +511,70 @@ class StateResolutionHandler(object):
|
|||
defer.returnValue(cache)
|
||||
|
||||
|
||||
def _make_state_cache_entry(
|
||||
new_state,
|
||||
state_groups_ids,
|
||||
):
|
||||
"""Given a resolved state, and a set of input state groups, pick one to base
|
||||
a new state group on (if any), and return an appropriately-constructed
|
||||
_StateCacheEntry.
|
||||
|
||||
Args:
|
||||
new_state (dict[(str, str), str]): resolved state map (mapping from
|
||||
(type, state_key) to event_id)
|
||||
|
||||
state_groups_ids (dict[int, dict[(str, str), str]]):
|
||||
map from state group id to the state in that state group
|
||||
(where 'state' is a map from state key to event id)
|
||||
|
||||
Returns:
|
||||
_StateCacheEntry
|
||||
"""
|
||||
# if the new state matches any of the input state groups, we can
|
||||
# use that state group again. Otherwise we will generate a state_id
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
|
||||
# first look for exact matches
|
||||
new_state_event_ids = set(itervalues(new_state))
|
||||
for sg, state in iteritems(state_groups_ids):
|
||||
if len(new_state_event_ids) != len(state):
|
||||
continue
|
||||
|
||||
old_state_event_ids = set(itervalues(state))
|
||||
if new_state_event_ids == old_state_event_ids:
|
||||
# got an exact match.
|
||||
return _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=sg,
|
||||
)
|
||||
|
||||
# TODO: We want to create a state group for this set of events, to
|
||||
# increase cache hits, but we need to make sure that it doesn't
|
||||
# end up as a prev_group without being added to the database
|
||||
|
||||
# failing that, look for the closest match.
|
||||
prev_group = None
|
||||
delta_ids = None
|
||||
|
||||
for old_group, old_state in iteritems(state_groups_ids):
|
||||
n_delta_ids = {
|
||||
k: v
|
||||
for k, v in iteritems(new_state)
|
||||
if old_state.get(k) != v
|
||||
}
|
||||
if not delta_ids or len(n_delta_ids) < len(delta_ids):
|
||||
prev_group = old_group
|
||||
delta_ids = n_delta_ids
|
||||
|
||||
return _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=None,
|
||||
prev_group=prev_group,
|
||||
delta_ids=delta_ids,
|
||||
)
|
||||
|
||||
|
||||
def _ordered_events(events):
|
||||
def key_func(e):
|
||||
return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
|
||||
|
@ -582,7 +616,7 @@ def _seperate(state_sets):
|
|||
with them in different state sets.
|
||||
|
||||
Args:
|
||||
state_sets(list[dict[(str, str), str]]):
|
||||
state_sets(iterable[dict[(str, str), str]]):
|
||||
List of dicts of (type, state_key) -> event_id, which are the
|
||||
different state groups to resolve.
|
||||
|
||||
|
@ -596,10 +630,11 @@ def _seperate(state_sets):
|
|||
conflicted_state is a dict mapping (type, state_key) to a set of
|
||||
event ids for conflicted state keys.
|
||||
"""
|
||||
unconflicted_state = dict(state_sets[0])
|
||||
state_set_iterator = iter(state_sets)
|
||||
unconflicted_state = dict(next(state_set_iterator))
|
||||
conflicted_state = {}
|
||||
|
||||
for state_set in state_sets[1:]:
|
||||
for state_set in state_set_iterator:
|
||||
for key, value in iteritems(state_set):
|
||||
# Check if there is an unconflicted entry for the state key.
|
||||
unconflicted_value = unconflicted_state.get(key)
|
||||
|
|
|
@ -19,7 +19,7 @@ import logging
|
|||
from collections import OrderedDict, deque, namedtuple
|
||||
from functools import wraps
|
||||
|
||||
from six import iteritems, itervalues
|
||||
from six import iteritems
|
||||
from six.moves import range
|
||||
|
||||
from canonicaljson import json
|
||||
|
@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
|
|||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
|
@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
|
|||
# callbacks on the deferred.
|
||||
try:
|
||||
ret = yield per_item_callback(item)
|
||||
item.deferred.callback(ret)
|
||||
with PreserveLoggingContext():
|
||||
item.deferred.callback(ret)
|
||||
except Exception:
|
||||
item.deferred.errback()
|
||||
finally:
|
||||
|
@ -343,11 +344,14 @@ class EventsStore(EventsWorkerStore):
|
|||
new_forward_extremeties = {}
|
||||
|
||||
# map room_id->(type,state_key)->event_id tracking the full
|
||||
# state in each room after adding these events
|
||||
# state in each room after adding these events.
|
||||
# This is simply used to prefill the get_current_state_ids
|
||||
# cache
|
||||
current_state_for_room = {}
|
||||
|
||||
# map room_id->(to_delete, to_insert) where each entry is
|
||||
# a map (type,key)->event_id giving the state delta in each
|
||||
# map room_id->(to_delete, to_insert) where to_delete is a list
|
||||
# of type/state keys to remove from current state, and to_insert
|
||||
# is a map (type,key)->event_id giving the state delta in each
|
||||
# room
|
||||
state_delta_for_room = {}
|
||||
|
||||
|
@ -417,19 +421,40 @@ class EventsStore(EventsWorkerStore):
|
|||
logger.info(
|
||||
"Calculating state delta for room %s", room_id,
|
||||
)
|
||||
current_state = yield self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
latest_event_ids,
|
||||
new_latest_event_ids,
|
||||
)
|
||||
with Measure(
|
||||
self._clock,
|
||||
"persist_events.get_new_state_after_events",
|
||||
):
|
||||
res = yield self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
latest_event_ids,
|
||||
new_latest_event_ids,
|
||||
)
|
||||
current_state, delta_ids = res
|
||||
|
||||
# If either are not None then there has been a change,
|
||||
# and we need to work out the delta (or use that
|
||||
# given)
|
||||
if delta_ids is not None:
|
||||
# If there is a delta we know that we've
|
||||
# only added or replaced state, never
|
||||
# removed keys entirely.
|
||||
state_delta_for_room[room_id] = ([], delta_ids)
|
||||
elif current_state is not None:
|
||||
with Measure(
|
||||
self._clock,
|
||||
"persist_events.calculate_state_delta",
|
||||
):
|
||||
delta = yield self._calculate_state_delta(
|
||||
room_id, current_state,
|
||||
)
|
||||
state_delta_for_room[room_id] = delta
|
||||
|
||||
# If we have the current_state then lets prefill
|
||||
# the cache with it.
|
||||
if current_state is not None:
|
||||
current_state_for_room[room_id] = current_state
|
||||
delta = yield self._calculate_state_delta(
|
||||
room_id, current_state,
|
||||
)
|
||||
if delta is not None:
|
||||
state_delta_for_room[room_id] = delta
|
||||
|
||||
yield self.runInteraction(
|
||||
"persist_events",
|
||||
|
@ -528,9 +553,15 @@ class EventsStore(EventsWorkerStore):
|
|||
the new forward extremities for the room.
|
||||
|
||||
Returns:
|
||||
Deferred[dict[(str,str), str]|None]:
|
||||
None if there are no changes to the room state, or
|
||||
a dict of (type, state_key) -> event_id].
|
||||
Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
|
||||
Returns a tuple of two state maps, the first being the full new current
|
||||
state and the second being the delta to the existing current state.
|
||||
If both are None then there has been no change.
|
||||
|
||||
If there has been a change then we only return the delta if its
|
||||
already been calculated. Conversely if we do know the delta then
|
||||
the new current state is only returned if we've already calculated
|
||||
it.
|
||||
"""
|
||||
|
||||
if not new_latest_event_ids:
|
||||
|
@ -538,6 +569,10 @@ class EventsStore(EventsWorkerStore):
|
|||
|
||||
# map from state_group to ((type, key) -> event_id) state map
|
||||
state_groups_map = {}
|
||||
|
||||
# Map from (prev state group, new state group) -> delta state dict
|
||||
state_group_deltas = {}
|
||||
|
||||
for ev, ctx in events_context:
|
||||
if ctx.state_group is None:
|
||||
# I don't think this can happen, but let's double-check
|
||||
|
@ -556,6 +591,9 @@ class EventsStore(EventsWorkerStore):
|
|||
if current_state_ids is not None:
|
||||
state_groups_map[ctx.state_group] = current_state_ids
|
||||
|
||||
if ctx.prev_group:
|
||||
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
|
||||
|
||||
# We need to map the event_ids to their state groups. First, let's
|
||||
# check if the event is one we're persisting, in which case we can
|
||||
# pull the state group from its context.
|
||||
|
@ -597,7 +635,26 @@ class EventsStore(EventsWorkerStore):
|
|||
# If they old and new groups are the same then we don't need to do
|
||||
# anything.
|
||||
if old_state_groups == new_state_groups:
|
||||
return
|
||||
defer.returnValue((None, None))
|
||||
|
||||
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
|
||||
# If we're going from one state group to another, lets check if
|
||||
# we have a delta for that transition. If we do then we can just
|
||||
# return that.
|
||||
|
||||
new_state_group = next(iter(new_state_groups))
|
||||
old_state_group = next(iter(old_state_groups))
|
||||
|
||||
delta_ids = state_group_deltas.get(
|
||||
(old_state_group, new_state_group,), None
|
||||
)
|
||||
if delta_ids is not None:
|
||||
# We have a delta from the existing to new current state,
|
||||
# so lets just return that. If we happen to already have
|
||||
# the current state in memory then lets also return that,
|
||||
# but it doesn't matter if we don't.
|
||||
new_state = state_groups_map.get(new_state_group)
|
||||
defer.returnValue((new_state, delta_ids))
|
||||
|
||||
# Now that we have calculated new_state_groups we need to get
|
||||
# their state IDs so we can resolve to a single state set.
|
||||
|
@ -609,7 +666,7 @@ class EventsStore(EventsWorkerStore):
|
|||
if len(new_state_groups) == 1:
|
||||
# If there is only one state group, then we know what the current
|
||||
# state is.
|
||||
defer.returnValue(state_groups_map[new_state_groups.pop()])
|
||||
defer.returnValue((state_groups_map[new_state_groups.pop()], None))
|
||||
|
||||
# Ok, we need to defer to the state handler to resolve our state sets.
|
||||
|
||||
|
@ -628,7 +685,7 @@ class EventsStore(EventsWorkerStore):
|
|||
room_id, state_groups, events_map, get_events
|
||||
)
|
||||
|
||||
defer.returnValue(res.state)
|
||||
defer.returnValue((res.state, None))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _calculate_state_delta(self, room_id, current_state):
|
||||
|
@ -637,28 +694,20 @@ class EventsStore(EventsWorkerStore):
|
|||
Assumes that we are only persisting events for one room at a time.
|
||||
|
||||
Returns:
|
||||
2-tuple (to_delete, to_insert) where both are state dicts,
|
||||
i.e. (type, state_key) -> event_id. `to_delete` are the entries to
|
||||
first be deleted from current_state_events, `to_insert` are entries
|
||||
to insert.
|
||||
tuple[list, dict] (to_delete, to_insert): where to_delete are the
|
||||
type/state_keys to remove from current_state_events and `to_insert`
|
||||
are the updates to current_state_events.
|
||||
"""
|
||||
existing_state = yield self.get_current_state_ids(room_id)
|
||||
|
||||
existing_events = set(itervalues(existing_state))
|
||||
new_events = set(ev_id for ev_id in itervalues(current_state))
|
||||
changed_events = existing_events ^ new_events
|
||||
to_delete = [
|
||||
key for key in existing_state
|
||||
if key not in current_state
|
||||
]
|
||||
|
||||
if not changed_events:
|
||||
return
|
||||
|
||||
to_delete = {
|
||||
key: ev_id for key, ev_id in iteritems(existing_state)
|
||||
if ev_id in changed_events
|
||||
}
|
||||
events_to_insert = (new_events - existing_events)
|
||||
to_insert = {
|
||||
key: ev_id for key, ev_id in iteritems(current_state)
|
||||
if ev_id in events_to_insert
|
||||
if ev_id != existing_state.get(key)
|
||||
}
|
||||
|
||||
defer.returnValue((to_delete, to_insert))
|
||||
|
@ -681,10 +730,10 @@ class EventsStore(EventsWorkerStore):
|
|||
delete_existing (bool): True to purge existing table rows for the
|
||||
events from the database. This is useful when retrying due to
|
||||
IntegrityError.
|
||||
state_delta_for_room (dict[str, (list[str], list[str])]):
|
||||
state_delta_for_room (dict[str, (list, dict)]):
|
||||
The current-state delta for each room. For each room, a tuple
|
||||
(to_delete, to_insert), being a list of event ids to be removed
|
||||
from the current state, and a list of event ids to be added to
|
||||
(to_delete, to_insert), being a list of type/state keys to be
|
||||
removed from the current state, and a state set to be added to
|
||||
the current state.
|
||||
new_forward_extremeties (dict[str, list[str]]):
|
||||
The new forward extremities for each room. For each room, a
|
||||
|
@ -762,9 +811,46 @@ class EventsStore(EventsWorkerStore):
|
|||
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
|
||||
for room_id, current_state_tuple in iteritems(state_delta_by_room):
|
||||
to_delete, to_insert = current_state_tuple
|
||||
|
||||
# First we add entries to the current_state_delta_stream. We
|
||||
# do this before updating the current_state_events table so
|
||||
# that we can use it to calculate the `prev_event_id`. (This
|
||||
# allows us to not have to pull out the existing state
|
||||
# unnecessarily).
|
||||
sql = """
|
||||
INSERT INTO current_state_delta_stream
|
||||
(stream_id, room_id, type, state_key, event_id, prev_event_id)
|
||||
SELECT ?, ?, ?, ?, ?, (
|
||||
SELECT event_id FROM current_state_events
|
||||
WHERE room_id = ? AND type = ? AND state_key = ?
|
||||
)
|
||||
"""
|
||||
txn.executemany(sql, (
|
||||
(
|
||||
max_stream_order, room_id, etype, state_key, None,
|
||||
room_id, etype, state_key,
|
||||
)
|
||||
for etype, state_key in to_delete
|
||||
# We sanity check that we're deleting rather than updating
|
||||
if (etype, state_key) not in to_insert
|
||||
))
|
||||
txn.executemany(sql, (
|
||||
(
|
||||
max_stream_order, room_id, etype, state_key, ev_id,
|
||||
room_id, etype, state_key,
|
||||
)
|
||||
for (etype, state_key), ev_id in iteritems(to_insert)
|
||||
))
|
||||
|
||||
# Now we actually update the current_state_events table
|
||||
|
||||
txn.executemany(
|
||||
"DELETE FROM current_state_events WHERE event_id = ?",
|
||||
[(ev_id,) for ev_id in itervalues(to_delete)],
|
||||
"DELETE FROM current_state_events"
|
||||
" WHERE room_id = ? AND type = ? AND state_key = ?",
|
||||
(
|
||||
(room_id, etype, state_key)
|
||||
for etype, state_key in itertools.chain(to_delete, to_insert)
|
||||
),
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
|
@ -781,25 +867,6 @@ class EventsStore(EventsWorkerStore):
|
|||
],
|
||||
)
|
||||
|
||||
state_deltas = {key: None for key in to_delete}
|
||||
state_deltas.update(to_insert)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="current_state_delta_stream",
|
||||
values=[
|
||||
{
|
||||
"stream_id": max_stream_order,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": ev_id,
|
||||
"prev_event_id": to_delete.get(key, None),
|
||||
}
|
||||
for key, ev_id in iteritems(state_deltas)
|
||||
]
|
||||
)
|
||||
|
||||
txn.call_after(
|
||||
self._curr_state_delta_stream_cache.entity_has_changed,
|
||||
room_id, max_stream_order,
|
||||
|
@ -813,7 +880,8 @@ class EventsStore(EventsWorkerStore):
|
|||
# and which we have added, then we invlidate the caches for all
|
||||
# those users.
|
||||
members_changed = set(
|
||||
state_key for ev_type, state_key in state_deltas
|
||||
state_key
|
||||
for ev_type, state_key in itertools.chain(to_delete, to_insert)
|
||||
if ev_type == EventTypes.Member
|
||||
)
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ from canonicaljson import json
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
from synapse.storage.appservice import ApplicationServiceWorkerStore
|
||||
from synapse.storage.pusher import PusherWorkerStore
|
||||
|
@ -250,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
|
|||
if uid in local_users_in_room:
|
||||
user_ids.add(uid)
|
||||
|
||||
forgotten = yield self.who_forgot_in_room(
|
||||
event.room_id, on_invalidate=cache_context.invalidate,
|
||||
)
|
||||
|
||||
for row in forgotten:
|
||||
user_id = row["user_id"]
|
||||
event_id = row["event_id"]
|
||||
|
||||
mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
|
||||
if event_id == mem_id:
|
||||
user_ids.discard(user_id)
|
||||
|
||||
rules_by_user = yield self.bulk_get_push_rules(
|
||||
user_ids, on_invalidate=cache_context.invalidate,
|
||||
)
|
||||
|
|
|
@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
|
|||
)
|
||||
|
||||
if newly_inserted:
|
||||
self.runInteraction(
|
||||
yield self.runInteraction(
|
||||
"add_pusher",
|
||||
self._invalidate_cache_and_stream,
|
||||
self.get_if_user_has_pusher, (user_id,)
|
||||
|
|
|
@ -468,18 +468,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
def _get_joined_hosts_cache(self, room_id):
|
||||
return _JoinedHostsCache(self, room_id)
|
||||
|
||||
@cached()
|
||||
def who_forgot_in_room(self, room_id):
|
||||
return self._simple_select_list(
|
||||
table="room_memberships",
|
||||
retcols=("user_id", "event_id"),
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"forgotten": 1,
|
||||
},
|
||||
desc="who_forgot"
|
||||
)
|
||||
|
||||
|
||||
class RoomMemberStore(RoomMemberWorkerStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
|
@ -588,9 +576,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
|||
txn.execute(sql, (user_id, room_id))
|
||||
|
||||
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.who_forgot_in_room, (room_id,)
|
||||
)
|
||||
return self.runInteraction("forget_membership", f)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2)
|
||||
|
|
|
@ -24,7 +24,6 @@ from twisted.internet import defer
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -76,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
|
|||
types=types,
|
||||
)
|
||||
|
||||
forgotten = yield make_deferred_yieldable(defer.gatherResults([
|
||||
defer.maybeDeferred(
|
||||
preserve_fn(store.who_forgot_in_room),
|
||||
room_id,
|
||||
)
|
||||
for room_id in frozenset(e.room_id for e in events)
|
||||
], consumeErrors=True))
|
||||
|
||||
# Set of membership event_ids that have been forgotten
|
||||
event_id_forgotten = frozenset(
|
||||
row["event_id"] for rows in forgotten for row in rows
|
||||
)
|
||||
|
||||
ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
|
||||
"m.ignored_user_list", user_id,
|
||||
)
|
||||
|
@ -177,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
|
|||
if membership is None:
|
||||
membership_event = state.get((EventTypes.Member, user_id), None)
|
||||
if membership_event:
|
||||
# XXX why do we do this?
|
||||
# https://github.com/matrix-org/synapse/issues/3350
|
||||
if membership_event.event_id not in event_id_forgotten:
|
||||
membership = membership_event.membership
|
||||
membership = membership_event.membership
|
||||
|
||||
# if the user was a member of the room at the time of the event,
|
||||
# they can see it.
|
||||
|
|
Loading…
Reference in a new issue