0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-15 23:13:50 +01:00

Merge branch 'develop' into release-v0.11.0

This commit is contained in:
Erik Johnston 2015-11-13 11:40:17 +00:00
commit da3dd4867d
18 changed files with 473 additions and 141 deletions

View file

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import argparse import argparse
import errno
import os import os
import yaml import yaml
import sys import sys
@ -91,8 +92,11 @@ class Config(object):
@classmethod @classmethod
def ensure_directory(cls, dir_path): def ensure_directory(cls, dir_path):
dir_path = cls.abspath(dir_path) dir_path = cls.abspath(dir_path)
if not os.path.exists(dir_path): try:
os.makedirs(dir_path) os.makedirs(dir_path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dir_path): if not os.path.isdir(dir_path):
raise ConfigError( raise ConfigError(
"%s is not a directory" % (dir_path,) "%s is not a directory" % (dir_path,)

View file

@ -357,7 +357,8 @@ class FederationClient(FederationBase):
defer.returnValue(signed_auth) defer.returnValue(signed_auth)
@defer.inlineCallbacks @defer.inlineCallbacks
def make_membership_event(self, destinations, room_id, user_id, membership): def make_membership_event(self, destinations, room_id, user_id, membership,
content={},):
""" """
Creates an m.room.member event, with context, without participating in the room. Creates an m.room.member event, with context, without participating in the room.
@ -398,6 +399,8 @@ class FederationClient(FederationBase):
logger.debug("Got response to make_%s: %s", membership, pdu_dict) logger.debug("Got response to make_%s: %s", membership, pdu_dict)
pdu_dict["content"].update(content)
defer.returnValue( defer.returnValue(
(destination, self.event_from_pdu_json(pdu_dict)) (destination, self.event_from_pdu_json(pdu_dict))
) )

View file

@ -29,6 +29,12 @@ logger = logging.getLogger(__name__)
class BaseHandler(object): class BaseHandler(object):
"""
Common base class for the event handlers.
:type store: synapse.storage.events.StateStore
:type state_handler: synapse.state.StateHandler
"""
def __init__(self, hs): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()

View file

@ -564,7 +564,7 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def do_invite_join(self, target_hosts, room_id, joinee): def do_invite_join(self, target_hosts, room_id, joinee, content):
""" Attempts to join the `joinee` to the room `room_id` via the """ Attempts to join the `joinee` to the room `room_id` via the
server `target_host`. server `target_host`.
@ -584,7 +584,8 @@ class FederationHandler(BaseHandler):
target_hosts, target_hosts,
room_id, room_id,
joinee, joinee,
"join" "join",
content,
) )
self.room_queues[room_id] = [] self.room_queues[room_id] = []
@ -840,12 +841,14 @@ class FederationHandler(BaseHandler):
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership): def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={},):
origin, pdu = yield self.replication_layer.make_membership_event( origin, pdu = yield self.replication_layer.make_membership_event(
target_hosts, target_hosts,
room_id, room_id,
user_id, user_id,
membership membership,
content,
) )
logger.debug("Got response to make_%s: %s", membership, pdu) logger.debug("Got response to make_%s: %s", membership, pdu)

View file

@ -258,20 +258,30 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): def _check_in_room_or_world_readable(self, room_id, user_id, is_guest):
if is_guest: try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
return
except AuthError, auth_error:
visibility = yield self.state_handler.get_current_state( visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, "" room_id, EventTypes.RoomHistoryVisibility, ""
) )
if visibility.content["history_visibility"] == "world_readable": if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None)) defer.returnValue((Membership.JOIN, None))
return return
else: if not is_guest:
raise AuthError( raise auth_error
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN raise AuthError(
) 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
else: )
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False): def get_state_events(self, user_id, room_id, is_guest=False):
@ -456,7 +466,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def room_initial_sync(self, user_id, room_id, pagin_config=None): def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False):
"""Capture the a snapshot of a room. If user is currently a member of """Capture the a snapshot of a room. If user is currently a member of
the room this will be what is currently in the room. If the user left the room this will be what is currently in the room. If the user left
the room this will be what was in the room when they left. the room this will be what was in the room when they left.
@ -473,15 +483,19 @@ class MessageHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room. A JSON serialisable dict with the snapshot of the room.
""" """
member_event = yield self.auth.check_user_was_in_room(room_id, user_id) membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id,
user_id,
is_guest
)
if member_event.membership == Membership.JOIN: if membership == Membership.JOIN:
result = yield self._room_initial_sync_joined( result = yield self._room_initial_sync_joined(
user_id, room_id, pagin_config, member_event user_id, room_id, pagin_config, membership, is_guest
) )
elif member_event.membership == Membership.LEAVE: elif membership == Membership.LEAVE:
result = yield self._room_initial_sync_parted( result = yield self._room_initial_sync_parted(
user_id, room_id, pagin_config, member_event user_id, room_id, pagin_config, membership, member_event_id, is_guest
) )
private_user_data = [] private_user_data = []
@ -497,19 +511,19 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _room_initial_sync_parted(self, user_id, room_id, pagin_config, def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
member_event): membership, member_event_id, is_guest):
room_state = yield self.store.get_state_for_events( room_state = yield self.store.get_state_for_events(
[member_event.event_id], None [member_event_id], None
) )
room_state = room_state[member_event.event_id] room_state = room_state[member_event_id]
limit = pagin_config.limit if pagin_config else None limit = pagin_config.limit if pagin_config else None
if limit is None: if limit is None:
limit = 10 limit = 10
stream_token = yield self.store.get_stream_token_for_event( stream_token = yield self.store.get_stream_token_for_event(
member_event.event_id member_event_id
) )
messages, token = yield self.store.get_recent_events_for_room( messages, token = yield self.store.get_recent_events_for_room(
@ -519,7 +533,7 @@ class MessageHandler(BaseHandler):
) )
messages = yield self._filter_events_for_client( messages = yield self._filter_events_for_client(
user_id, messages user_id, messages, is_guest=is_guest
) )
start_token = StreamToken(token[0], 0, 0, 0, 0) start_token = StreamToken(token[0], 0, 0, 0, 0)
@ -528,7 +542,7 @@ class MessageHandler(BaseHandler):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
defer.returnValue({ defer.returnValue({
"membership": member_event.membership, "membership": membership,
"room_id": room_id, "room_id": room_id,
"messages": { "messages": {
"chunk": [serialize_event(m, time_now) for m in messages], "chunk": [serialize_event(m, time_now) for m in messages],
@ -542,7 +556,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _room_initial_sync_joined(self, user_id, room_id, pagin_config, def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
member_event): membership, is_guest):
current_state = yield self.state.get_current_state( current_state = yield self.state.get_current_state(
room_id=room_id, room_id=room_id,
) )
@ -574,12 +588,14 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_presence(): def get_presence():
states = yield presence_handler.get_states( states = {}
target_users=[UserID.from_string(m.user_id) for m in room_members], if not is_guest:
auth_user=auth_user, states = yield presence_handler.get_states(
as_event=True, target_users=[UserID.from_string(m.user_id) for m in room_members],
check_auth=False, auth_user=auth_user,
) as_event=True,
check_auth=False,
)
defer.returnValue(states.values()) defer.returnValue(states.values())
@ -599,7 +615,7 @@ class MessageHandler(BaseHandler):
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError)
messages = yield self._filter_events_for_client( messages = yield self._filter_events_for_client(
user_id, messages user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False
) )
start_token = now_token.copy_and_replace("room_key", token[0]) start_token = now_token.copy_and_replace("room_key", token[0])
@ -607,8 +623,7 @@ class MessageHandler(BaseHandler):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
defer.returnValue({ ret = {
"membership": member_event.membership,
"room_id": room_id, "room_id": room_id,
"messages": { "messages": {
"chunk": [serialize_event(m, time_now) for m in messages], "chunk": [serialize_event(m, time_now) for m in messages],
@ -618,4 +633,8 @@ class MessageHandler(BaseHandler):
"state": state, "state": state,
"presence": presence, "presence": presence,
"receipts": receipts, "receipts": receipts,
}) }
if not is_guest:
ret["membership"] = membership
defer.returnValue(ret)

View file

@ -504,7 +504,8 @@ class RoomMemberHandler(BaseHandler):
yield handler.do_invite_join( yield handler.do_invite_join(
room_hosts, room_hosts,
room_id, room_id,
event.user_id event.user_id,
event.content,
) )
else: else:
logger.debug("Doing normal join") logger.debug("Doing normal join")

View file

@ -47,9 +47,9 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id", "room_id", # str
"timeline", "timeline", # TimelineBatch
"state", "state", # dict[(str, str), FrozenEvent]
"ephemeral", "ephemeral",
"private_user_data", "private_user_data",
])): ])):
@ -68,9 +68,9 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id", "room_id", # str
"timeline", "timeline", # TimelineBatch
"state", "state", # dict[(str, str), FrozenEvent]
"private_user_data", "private_user_data",
])): ])):
__slots__ = [] __slots__ = []
@ -87,8 +87,8 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id", "room_id", # str
"invite", "invite", # FrozenEvent: the invite event
])): ])):
__slots__ = [] __slots__ = []
@ -254,15 +254,12 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token=timeline_since_token room_id, sync_config, now_token, since_token=timeline_since_token
) )
current_state = yield self.state_handler.get_current_state( current_state = yield self.get_state_at(room_id, now_token)
room_id
)
current_state_events = current_state.values()
defer.returnValue(JoinedSyncResult( defer.returnValue(JoinedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=current_state_events, state=current_state,
ephemeral=ephemeral_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
@ -354,14 +351,12 @@ class SyncHandler(BaseHandler):
room_id, sync_config, leave_token, since_token=timeline_since_token room_id, sync_config, leave_token, since_token=timeline_since_token
) )
leave_state = yield self.store.get_state_for_events( leave_state = yield self.store.get_state_for_event(leave_event_id)
[leave_event_id], None
)
defer.returnValue(ArchivedSyncResult( defer.returnValue(ArchivedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=leave_state[leave_event_id].values(), state=leave_state,
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
), ),
@ -425,6 +420,9 @@ class SyncHandler(BaseHandler):
if len(room_events) <= timeline_limit: if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just # There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them. # partition the new events by room and return them.
logger.debug("Got %i events for incremental sync - not limited",
len(room_events))
invite_events = [] invite_events = []
leave_events = [] leave_events = []
events_by_room_id = {} events_by_room_id = {}
@ -440,7 +438,12 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids: for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, []) recents = events_by_room_id.get(room_id, [])
state = [event for event in recents if event.is_state()] logger.debug("Events for room %s: %r", room_id, recents)
state = {
(event.type, event.state_key): event
for event in recents if event.is_state()}
limited = False
if recents: if recents:
prev_batch = now_token.copy_and_replace( prev_batch = now_token.copy_and_replace(
"room_key", recents[0].internal_metadata.before "room_key", recents[0].internal_metadata.before
@ -448,9 +451,13 @@ class SyncHandler(BaseHandler):
else: else:
prev_batch = now_token prev_batch = now_token
state, limited = yield self.check_joined_room( just_joined = yield self.check_joined_room(sync_config, state)
sync_config, room_id, state if just_joined:
) logger.debug("User has just joined %s: needs full state",
room_id)
state = yield self.get_state_at(room_id, now_token)
# the timeline is inherently limited if we've just joined
limited = True
room_sync = JoinedSyncResult( room_sync = JoinedSyncResult(
room_id=room_id, room_id=room_id,
@ -465,10 +472,15 @@ class SyncHandler(BaseHandler):
room_id, tags_by_room room_id, tags_by_room
), ),
) )
logger.debug("Result for room %s: %r", room_id, room_sync)
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
else: else:
logger.debug("Got %i events for incremental sync - hit limit",
len(room_events))
invite_events = yield self.store.get_invites_for_user( invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string() sync_config.user.to_string()
) )
@ -507,6 +519,9 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token, def load_filtered_recents(self, room_id, sync_config, now_token,
since_token=None): since_token=None):
"""
:returns a Deferred TimelineBatch
"""
limited = True limited = True
recents = [] recents = []
filtering_factor = 2 filtering_factor = 2
@ -558,6 +573,8 @@ class SyncHandler(BaseHandler):
Returns: Returns:
A Deferred JoinedSyncResult A Deferred JoinedSyncResult
""" """
logger.debug("Doing incremental sync for room %s between %s and %s",
room_id, since_token, now_token)
# TODO(mjark): Check for redactions we might have missed. # TODO(mjark): Check for redactions we might have missed.
@ -567,31 +584,26 @@ class SyncHandler(BaseHandler):
logging.debug("Recents %r", batch) logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a current_state = yield self.get_state_at(room_id, now_token)
# token to indicate what point in the stream this is
current_state = yield self.state_handler.get_current_state(
room_id
)
current_state_events = current_state.values()
state_at_previous_sync = yield self.get_state_at_previous_sync( state_at_previous_sync = yield self.get_state_at(
room_id, since_token=since_token room_id, stream_position=since_token
) )
state_events_delta = yield self.compute_state_delta( state = yield self.compute_state_delta(
since_token=since_token, since_token=since_token,
previous_state=state_at_previous_sync, previous_state=state_at_previous_sync,
current_state=current_state_events, current_state=current_state,
) )
state_events_delta, _ = yield self.check_joined_room( just_joined = yield self.check_joined_room(sync_config, state)
sync_config, room_id, state_events_delta if just_joined:
) state = yield self.get_state_at(room_id, now_token)
room_sync = JoinedSyncResult( room_sync = JoinedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=state_events_delta, state=state,
ephemeral=ephemeral_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
@ -623,16 +635,12 @@ class SyncHandler(BaseHandler):
logging.debug("Recents %r", batch) logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a state_events_at_leave = yield self.store.get_state_for_event(
# token to indicate what point in the stream this is leave_event.event_id
leave_state = yield self.store.get_state_for_events(
[leave_event.event_id], None
) )
state_events_at_leave = leave_state[leave_event.event_id].values() state_at_previous_sync = yield self.get_state_at(
leave_event.room_id, stream_position=since_token
state_at_previous_sync = yield self.get_state_at_previous_sync(
leave_event.room_id, since_token=since_token
) )
state_events_delta = yield self.compute_state_delta( state_events_delta = yield self.compute_state_delta(
@ -655,60 +663,77 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync) defer.returnValue(room_sync)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_at_previous_sync(self, room_id, since_token): def get_state_after_event(self, event):
""" Get the room state at the previous sync the client made. """
Returns: Get the room state after the given event
A Deferred list of Events.
:param synapse.events.EventBase event: event of interest
:return: A Deferred map from ((type, state_key)->Event)
"""
state = yield self.store.get_state_for_event(event.event_id)
if event.is_state():
state = state.copy()
state[(event.type, event.state_key)] = event
defer.returnValue(state)
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position):
""" Get the room state at a particular stream position
:param str room_id: room for which to get state
:param StreamToken stream_position: point at which to get state
:returns: A Deferred map from ((type, state_key)->Event)
""" """
last_events, token = yield self.store.get_recent_events_for_room( last_events, token = yield self.store.get_recent_events_for_room(
room_id, end_token=since_token.room_key, limit=1, room_id, end_token=stream_position.room_key, limit=1,
) )
if last_events: if last_events:
last_event = last_events[0] last_event = last_events[-1]
last_context = yield self.state_handler.compute_event_context( state = yield self.get_state_after_event(last_event)
last_event
)
if last_event.is_state():
state = [last_event] + last_context.current_state.values()
else:
state = last_context.current_state.values()
else: else:
state = () # no events in this room - so presumably no state
state = {}
defer.returnValue(state) defer.returnValue(state)
def compute_state_delta(self, since_token, previous_state, current_state): def compute_state_delta(self, since_token, previous_state, current_state):
""" Works out the differnce in state between the current state and the """ Works out the differnce in state between the current state and the
state the client got when it last performed a sync. state the client got when it last performed a sync.
Returns:
A list of events. :param str since_token: the point we are comparing against
:param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
state to compare to
:param dict[(str,str), synapse.events.FrozenEvent] current_state: the
new state
:returns A new event dictionary
""" """
# TODO(mjark) Check if the state events were received by the server # TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state # after the previous sync, since we need to include those state
# updates even if they occured logically before the previous event. # updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events. # TODO(mjark) Check for new redactions in the state events.
previous_dict = {event.event_id: event for event in previous_state}
state_delta = [] state_delta = {}
for event in current_state: for key, event in current_state.iteritems():
if event.event_id not in previous_dict: if (key not in previous_state or
state_delta.append(event) previous_state[key].event_id != event.event_id):
state_delta[key] = event
return state_delta return state_delta
@defer.inlineCallbacks def check_joined_room(self, sync_config, state_delta):
def check_joined_room(self, sync_config, room_id, state_delta): """
joined = False Check if the user has just joined the given room (so should
limited = False be given the full state)
for event in state_delta:
if (
event.type == EventTypes.Member
and event.state_key == sync_config.user.to_string()
):
if event.content["membership"] == Membership.JOIN:
joined = True
if joined: :param sync_config:
res = yield self.state_handler.get_current_state(room_id) :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
state_delta = res.values() difference in state since the last sync
limited = True
defer.returnValue((state_delta, limited)) :returns A deferred Tuple (state_delta, limited)
"""
join_event = state_delta.get((
EventTypes.Member, sync_config.user.to_string()), None)
if join_event is not None:
if join_event.content["membership"] == Membership.JOIN:
return True
return False

View file

@ -14,6 +14,8 @@
# limitations under the License. # limitations under the License.
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, ObservableDeferred from synapse.util.async import run_on_reactor, ObservableDeferred
@ -346,9 +348,9 @@ class Notifier(object):
room_ids = [] room_ids = []
if is_guest: if is_guest:
# TODO(daniel): Deal with non-room events too
only_room_events = True
if guest_room_id: if guest_room_id:
if not self._is_world_readable(guest_room_id):
raise AuthError(403, "Guest access not allowed")
room_ids = [guest_room_id] room_ids = [guest_room_id]
else: else:
rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = yield self.store.get_rooms_for_user(user.to_string())
@ -361,6 +363,7 @@ class Notifier(object):
events = [] events = []
end_token = from_token end_token = from_token
for name, source in self.event_sources.sources.items(): for name, source in self.event_sources.sources.items():
keyname = "%s_key" % name keyname = "%s_key" % name
before_id = getattr(before_token, keyname) before_id = getattr(before_token, keyname)
@ -377,7 +380,7 @@ class Notifier(object):
room_ids=room_ids, room_ids=room_ids,
) )
if is_guest: if name == "room":
room_member_handler = self.hs.get_handlers().room_member_handler room_member_handler = self.hs.get_handlers().room_member_handler
new_events = yield room_member_handler._filter_events_for_client( new_events = yield room_member_handler._filter_events_for_client(
user.to_string(), user.to_string(),
@ -403,6 +406,17 @@ class Notifier(object):
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
state = yield self.hs.get_state_handler().get_current_state(
room_id,
EventTypes.RoomHistoryVisibility
)
if state and "history_visibility" in state.content:
defer.returnValue(state.content["history_visibility"] == "world_readable")
else:
defer.returnValue(False)
@log_function @log_function
def remove_expired_streams(self): def remove_expired_streams(self):
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()

View file

@ -37,7 +37,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, request, user_id): def on_PUT(self, request, user_id):
auth_user, _, _ = yield self.auth.get_user_by_req(request) auth_user, _, _ = yield self.auth.get_user_by_req(request, allow_guest=True)
user = UserID.from_string(user_id) user = UserID.from_string(user_id)
try: try:

View file

@ -372,12 +372,13 @@ class RoomInitialSyncRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
user, _, _ = yield self.auth.get_user_by_req(request) user, _, is_guest = yield self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request) pagination_config = PaginationConfig.from_request(request)
content = yield self.handlers.message_handler.room_initial_sync( content = yield self.handlers.message_handler.room_initial_sync(
room_id=room_id, room_id=room_id,
user_id=user.to_string(), user_id=user.to_string(),
pagin_config=pagination_config, pagin_config=pagination_config,
is_guest=is_guest,
) )
defer.returnValue((200, content)) defer.returnValue((200, content))

View file

@ -20,6 +20,7 @@ from synapse.http.servlet import (
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.events import FrozenEvent
from synapse.events.utils import ( from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_event_id, serialize_event, format_event_for_client_v2_without_event_id,
) )
@ -165,6 +166,20 @@ class SyncRestServlet(RestServlet):
return {"events": filter.filter_presence(formatted)} return {"events": filter.filter_presence(formatted)}
def encode_joined(self, rooms, filter, time_now, token_id): def encode_joined(self, rooms, filter, time_now, token_id):
"""
Encode the joined rooms in a sync result
:param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync
results for rooms this user is joined to
:param FilterCollection filter: filters to apply to the results
:param int time_now: current time - used as a baseline for age
calculations
:param int token_id: ID of the user's auth token - used for namespacing
of transaction IDs
:return: the joined rooms list, in our response format
:rtype: dict[str, dict[str, object]]
"""
joined = {} joined = {}
for room in rooms: for room in rooms:
joined[room.room_id] = self.encode_room( joined[room.room_id] = self.encode_room(
@ -174,6 +189,20 @@ class SyncRestServlet(RestServlet):
return joined return joined
def encode_invited(self, rooms, filter, time_now, token_id): def encode_invited(self, rooms, filter, time_now, token_id):
"""
Encode the invited rooms in a sync result
:param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of
sync results for rooms this user is joined to
:param FilterCollection filter: filters to apply to the results
:param int time_now: current time - used as a baseline for age
calculations
:param int token_id: ID of the user's auth token - used for namespacing
of transaction IDs
:return: the invited rooms list, in our response format
:rtype: dict[str, dict[str, object]]
"""
invited = {} invited = {}
for room in rooms: for room in rooms:
invite = serialize_event( invite = serialize_event(
@ -189,6 +218,20 @@ class SyncRestServlet(RestServlet):
return invited return invited
def encode_archived(self, rooms, filter, time_now, token_id): def encode_archived(self, rooms, filter, time_now, token_id):
"""
Encode the archived rooms in a sync result
:param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of
sync results for rooms this user is joined to
:param FilterCollection filter: filters to apply to the results
:param int time_now: current time - used as a baseline for age
calculations
:param int token_id: ID of the user's auth token - used for namespacing
of transaction IDs
:return: the invited rooms list, in our response format
:rtype: dict[str, dict[str, object]]
"""
joined = {} joined = {}
for room in rooms: for room in rooms:
joined[room.room_id] = self.encode_room( joined[room.room_id] = self.encode_room(
@ -199,8 +242,28 @@ class SyncRestServlet(RestServlet):
@staticmethod @staticmethod
def encode_room(room, filter, time_now, token_id, joined=True): def encode_room(room, filter, time_now, token_id, joined=True):
"""
:param JoinedSyncResult|ArchivedSyncResult room: sync result for a
single room
:param FilterCollection filter: filters to apply to the results
:param int time_now: current time - used as a baseline for age
calculations
:param int token_id: ID of the user's auth token - used for namespacing
of transaction IDs
:param joined: True if the user is joined to this room - will mean
we handle ephemeral events
:return: the room, encoded in our response format
:rtype: dict[str, object]
"""
event_map = {} event_map = {}
state_events = filter.filter_room_state(room.state) state_dict = room.state
timeline_events = filter.filter_room_timeline(room.timeline.events)
state_dict = SyncRestServlet._rollback_state_for_timeline(
state_dict, timeline_events)
state_events = filter.filter_room_state(state_dict.values())
state_event_ids = [] state_event_ids = []
for event in state_events: for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
@ -210,7 +273,6 @@ class SyncRestServlet(RestServlet):
) )
state_event_ids.append(event.event_id) state_event_ids.append(event.event_id)
timeline_events = filter.filter_room_timeline(room.timeline.events)
timeline_event_ids = [] timeline_event_ids = []
for event in timeline_events: for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
@ -241,6 +303,63 @@ class SyncRestServlet(RestServlet):
return result return result
@staticmethod
def _rollback_state_for_timeline(state, timeline):
"""
Wind the state dictionary backwards, so that it represents the
state at the start of the timeline, rather than at the end.
:param dict[(str, str), synapse.events.EventBase] state: the
state dictionary. Will be updated to the state before the timeline.
:param list[synapse.events.EventBase] timeline: the event timeline
:return: updated state dictionary
"""
logger.debug("Processing state dict %r; timeline %r", state,
[e.get_dict() for e in timeline])
result = state.copy()
for timeline_event in reversed(timeline):
if not timeline_event.is_state():
continue
event_key = (timeline_event.type, timeline_event.state_key)
logger.debug("Considering %s for removal", event_key)
state_event = result.get(event_key)
if (state_event is None or
state_event.event_id != timeline_event.event_id):
# the event in the timeline isn't present in the state
# dictionary.
#
# the most likely cause for this is that there was a fork in
# the event graph, and the state is no longer valid. Really,
# the event shouldn't be in the timeline. We're going to ignore
# it for now, however.
logger.warn("Found state event %r in timeline which doesn't "
"match state dictionary", timeline_event)
continue
prev_event_id = timeline_event.unsigned.get("replaces_state", None)
logger.debug("Replacing %s with %s in state dict",
timeline_event.event_id, prev_event_id)
if prev_event_id is None:
del result[event_key]
else:
result[event_key] = FrozenEvent({
"type": timeline_event.type,
"state_key": timeline_event.state_key,
"content": timeline_event.unsigned['prev_content'],
"sender": timeline_event.unsigned['prev_sender'],
"event_id": prev_event_id,
"room_id": timeline_event.room_id,
})
logger.debug("New value: %r", result.get(event_key))
return result
def register_servlets(hs, http_server): def register_servlets(hs, http_server):
SyncRestServlet(hs).register(http_server) SyncRestServlet(hs).register(http_server)

View file

@ -71,7 +71,7 @@ class StateHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""): def get_current_state(self, room_id, event_type=None, state_key=""):
""" Returns the current state for the room as a list. This is done by """ Retrieves the current state for the room. This is done by
calling `get_latest_events_in_room` to get the leading edges of the calling `get_latest_events_in_room` to get the leading edges of the
event graph and then resolving any of the state conflicts. event graph and then resolving any of the state conflicts.
@ -80,6 +80,8 @@ class StateHandler(object):
If `event_type` is specified, then the method returns only the one If `event_type` is specified, then the method returns only the one
event (or None) with that `event_type` and `state_key`. event (or None) with that `event_type` and `state_key`.
:returns map from (type, state_key) to event
""" """
event_ids = yield self.store.get_latest_event_ids_in_room(room_id) event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
@ -177,9 +179,10 @@ class StateHandler(object):
""" Given a list of event_ids this method fetches the state at each """ Given a list of event_ids this method fetches the state at each
event, resolves conflicts between them and returns them. event, resolves conflicts between them and returns them.
Return format is a tuple: (`state_group`, `state_events`), where the :returns a Deferred tuple of (`state_group`, `state`, `prev_state`).
first is the name of a state group if one and only one is involved, `state_group` is the name of a state group if one and only one is
otherwise `None`. involved. `state` is a map from (type, state_key) to event, and
`prev_state` is a list of event ids.
""" """
logger.debug("resolve_state_groups event_ids %s", event_ids) logger.debug("resolve_state_groups event_ids %s", event_ids)
@ -255,6 +258,11 @@ class StateHandler(object):
return self._resolve_events(state_sets) return self._resolve_events(state_sets)
def _resolve_events(self, state_sets, event_type=None, state_key=""): def _resolve_events(self, state_sets, event_type=None, state_key=""):
"""
:returns a tuple (new_state, prev_states). new_state is a map
from (type, state_key) to event. prev_states is a list of event_ids.
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
"""
state = {} state = {}
for st in state_sets: for st in state_sets:
for e in st: for e in st:
@ -307,19 +315,23 @@ class StateHandler(object):
We resolve conflicts in the following order: We resolve conflicts in the following order:
1. power levels 1. power levels
2. memberships 2. join rules
3. other events. 3. memberships
4. other events.
""" """
resolved_state = {} resolved_state = {}
power_key = (EventTypes.PowerLevels, "") power_key = (EventTypes.PowerLevels, "")
if power_key in conflicted_state.items(): if power_key in conflicted_state:
power_levels = conflicted_state[power_key] events = conflicted_state[power_key]
resolved_state[power_key] = self._resolve_auth_events(power_levels) logger.debug("Resolving conflicted power levels %r", events)
resolved_state[power_key] = self._resolve_auth_events(
events, auth_events)
auth_events.update(resolved_state) auth_events.update(resolved_state)
for key, events in conflicted_state.items(): for key, events in conflicted_state.items():
if key[0] == EventTypes.JoinRules: if key[0] == EventTypes.JoinRules:
logger.debug("Resolving conflicted join rules %r", events)
resolved_state[key] = self._resolve_auth_events( resolved_state[key] = self._resolve_auth_events(
events, events,
auth_events auth_events
@ -329,6 +341,7 @@ class StateHandler(object):
for key, events in conflicted_state.items(): for key, events in conflicted_state.items():
if key[0] == EventTypes.Member: if key[0] == EventTypes.Member:
logger.debug("Resolving conflicted member lists %r", events)
resolved_state[key] = self._resolve_auth_events( resolved_state[key] = self._resolve_auth_events(
events, events,
auth_events auth_events
@ -338,6 +351,7 @@ class StateHandler(object):
for key, events in conflicted_state.items(): for key, events in conflicted_state.items():
if key not in resolved_state: if key not in resolved_state:
logger.debug("Resolving conflicted state %r:%r", key, events)
resolved_state[key] = self._resolve_normal_events( resolved_state[key] = self._resolve_normal_events(
events, auth_events events, auth_events
) )

View file

@ -831,7 +831,8 @@ class EventsStore(SQLBaseStore):
allow_none=True, allow_none=True,
) )
if prev: if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"] ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill( self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev (ev.event_id, check_redacted, get_prev_content), ev
@ -888,7 +889,8 @@ class EventsStore(SQLBaseStore):
get_prev_content=False, get_prev_content=False,
) )
if prev: if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"] ev.unsigned["prev_content"] = prev.content
ev.unsigned["prev_sender"] = prev.sender
self._get_event_cache.prefill( self._get_event_cache.prefill(
(ev.event_id, check_redacted, get_prev_content), ev (ev.event_id, check_redacted, get_prev_content), ev

View file

@ -252,12 +252,23 @@ class SearchStore(BackgroundUpdateStore):
" WHERE vector @@ query AND room_id = ?" " WHERE vector @@ query AND room_id = ?"
) )
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
# We use CROSS JOIN here to ensure we use the right indexes.
# https://sqlite.org/optoverview.html#crossjoin
#
# We want to use the full text search index on event_search to
# extract all possible matches first, then lookup those matches
# in the events table to get the topological ordering. We need
# to use the indexes in this order because sqlite refuses to
# MATCH unless it uses the full text search index
sql = ( sql = (
"SELECT rank(matchinfo(event_search)) as rank, room_id, event_id," "SELECT rank(matchinfo) as rank, room_id, event_id,"
" topological_ordering, stream_ordering" " topological_ordering, stream_ordering"
" FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
" FROM event_search" " FROM event_search"
" NATURAL JOIN events" " WHERE value MATCH ?"
" WHERE value MATCH ? AND room_id = ?" " )"
" CROSS JOIN events USING (event_id)"
" WHERE room_id = ?"
) )
else: else:
# This should be unreachable. # This should be unreachable.

View file

@ -237,6 +237,20 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids}) defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
def get_state_for_event(self, event_id, types=None):
"""
Get the state dict corresponding to a particular event
:param str event_id: event whose state should be returned
:param list[(str, str)]|None types: List of (type, state_key) tuples
which are used to filter the state fetched. May be None, which
matches any key
:return: a deferred dict from (type, state_key) -> state_event
"""
state_map = yield self.get_state_for_events([event_id], types)
defer.returnValue(state_map[event_id])
@cached(num_args=2, lru=True, max_entries=10000) @cached(num_args=2, lru=True, max_entries=10000)
def _get_state_group_for_event(self, room_id, event_id): def _get_state_group_for_event(self, room_id, event_id):
return self._simple_select_one_onecol( return self._simple_select_one_onecol(

View file

@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore):
allow_none=True, allow_none=True,
) )
if result and result.response_code: if result and result["response_code"]:
return result["response_code"], result["response_json"] return result["response_code"], result["response_json"]
else: else:
return None return None

View file

@ -321,6 +321,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
hs.handlers.room_member_handler.get_room_members = ( hs.handlers.room_member_handler.get_room_members = (
lambda r: self.room_members if r == "a-room" else [] lambda r: self.room_members if r == "a-room" else []
) )
hs.handlers.room_member_handler._filter_events_for_client = (
lambda user_id, events, **kwargs: events
)
self.mock_datastore = hs.get_datastore() self.mock_datastore = hs.get_datastore()
self.mock_datastore.get_app_service_by_token = Mock(return_value=None) self.mock_datastore.get_app_service_by_token = Mock(return_value=None)

View file

@ -317,6 +317,99 @@ class StateTestCase(unittest.TestCase):
{e.event_id for e in context_store["E"].current_state.values()} {e.event_id for e in context_store["E"].current_state.values()}
) )
@defer.inlineCallbacks
def test_branch_have_perms_conflict(self):
userid1 = "@user_id:example.com"
userid2 = "@user_id2:example.com"
nodes = {
"A1": DictObj(
type=EventTypes.Create,
state_key="",
content={"creator": userid1},
depth=1,
),
"A2": DictObj(
type=EventTypes.Member,
state_key=userid1,
content={"membership": Membership.JOIN},
membership=Membership.JOIN,
),
"A3": DictObj(
type=EventTypes.Member,
state_key=userid2,
content={"membership": Membership.JOIN},
membership=Membership.JOIN,
),
"A4": DictObj(
type=EventTypes.PowerLevels,
state_key="",
content={
"events": {"m.room.name": 50},
"users": {userid1: 100,
userid2: 60},
},
),
"A5": DictObj(
type=EventTypes.Name,
state_key="",
),
"B": DictObj(
type=EventTypes.PowerLevels,
state_key="",
content={
"events": {"m.room.name": 50},
"users": {userid2: 30},
},
),
"C": DictObj(
type=EventTypes.Name,
state_key="",
sender=userid2,
),
"D": DictObj(
type=EventTypes.Message,
),
}
edges = {
"A2": ["A1"],
"A3": ["A2"],
"A4": ["A3"],
"A5": ["A4"],
"B": ["A5"],
"C": ["A5"],
"D": ["B", "C"]
}
self._add_depths(nodes, edges)
graph = Graph(nodes, edges)
store = StateGroupStore()
self.store.get_state_groups.side_effect = store.get_state_groups
context_store = {}
for event in graph.walk():
context = yield self.state.compute_event_context(event)
store.store_state_groups(event, context)
context_store[event.event_id] = context
self.assertSetEqual(
{"A1", "A2", "A3", "A5", "B"},
{e.event_id for e in context_store["D"].current_state.values()}
)
def _add_depths(self, nodes, edges):
def _get_depth(ev):
node = nodes[ev]
if 'depth' not in node:
prevs = edges[ev]
depth = max(_get_depth(prev) for prev in prevs) + 1
node['depth'] = depth
return node['depth']
for n in nodes:
_get_depth(n)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_annotate_with_old_message(self): def test_annotate_with_old_message(self):
event = create_event(type="test_message", name="event") event = create_event(type="test_message", name="event")