Merge branch 'develop' into markjh/room_tags

This commit is contained in:
Mark Haines 2015-11-02 10:57:00 +00:00
commit 0e36756383
11 changed files with 339 additions and 30 deletions

View file

@ -154,7 +154,8 @@ def serialize_event(e, time_now_ms, as_client_event=True,
if "redacted_because" in e.unsigned: if "redacted_because" in e.unsigned:
d["unsigned"]["redacted_because"] = serialize_event( d["unsigned"]["redacted_because"] = serialize_event(
e.unsigned["redacted_because"], time_now_ms e.unsigned["redacted_because"], time_now_ms,
event_format=event_format
) )
if token_id is not None: if token_id is not None:

View file

@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler
from synapse.appservice.api import ApplicationServiceApi from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler from .register import RegistrationHandler
from .room import ( from .room import (
RoomCreationHandler, RoomMemberHandler, RoomListHandler RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
) )
from .message import MessageHandler from .message import MessageHandler
from .events import EventStreamHandler, EventHandler from .events import EventStreamHandler, EventHandler
@ -70,3 +70,4 @@ class Handlers(object):
self.auth_handler = AuthHandler(hs) self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs) self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs) self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)

View file

@ -33,6 +33,7 @@ from collections import OrderedDict
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
import logging import logging
import math
import string import string
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -747,6 +748,60 @@ class RoomListHandler(BaseHandler):
defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
in a room.
Args:
user (UserID)
room_id (str)
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
Returns:
dict
"""
before_limit = math.floor(limit/2.)
after_limit = limit - before_limit
now_token = yield self.hs.get_event_sources().get_current_token()
results = yield self.store.get_events_around(
room_id, event_id, before_limit, after_limit
)
results["events_before"] = yield self._filter_events_for_client(
user.to_string(), results["events_before"]
)
results["events_after"] = yield self._filter_events_for_client(
user.to_string(), results["events_after"]
)
if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
else:
last_event_id = event_id
state = yield self.store.get_state_for_events(
[last_event_id], None
)
results["state"] = state[last_event_id].values()
results["start"] = now_token.copy_and_replace(
"room_key", results["start"]
).to_string()
results["end"] = now_token.copy_and_replace(
"room_key", results["end"]
).to_string()
defer.returnValue(results)
class RoomEventSource(object): class RoomEventSource(object):
def __init__(self, hs): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()

View file

@ -51,6 +51,17 @@ class SearchHandler(BaseHandler):
"content.body", "content.name", "content.topic", "content.body", "content.name", "content.topic",
]) ])
filter_dict = content["search_categories"]["room_events"].get("filter", {}) filter_dict = content["search_categories"]["room_events"].get("filter", {})
event_context = content["search_categories"]["room_events"].get(
"event_context", None
)
if event_context is not None:
before_limit = int(event_context.get(
"before_limit", 5
))
after_limit = int(event_context.get(
"after_limit", 5
))
except KeyError: except KeyError:
raise SynapseError(400, "Invalid search query") raise SynapseError(400, "Invalid search query")
@ -76,14 +87,57 @@ class SearchHandler(BaseHandler):
user.to_string(), filtered_events user.to_string(), filtered_events
) )
allowed_events.sort(key=lambda e: -rank_map[e.event_id])
allowed_events = allowed_events[:search_filter.limit()]
if event_context is not None:
now_token = yield self.hs.get_event_sources().get_current_token()
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
event.room_id, event.event_id, before_limit, after_limit
)
res["events_before"] = yield self._filter_events_for_client(
user.to_string(), res["events_before"]
)
res["events_after"] = yield self._filter_events_for_client(
user.to_string(), res["events_after"]
)
res["start"] = now_token.copy_and_replace(
"room_key", res["start"]
).to_string()
res["end"] = now_token.copy_and_replace(
"room_key", res["end"]
).to_string()
contexts[event.event_id] = res
else:
contexts = {}
# TODO: Add a limit # TODO: Add a limit
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
for context in contexts.values():
context["events_before"] = [
serialize_event(e, time_now)
for e in context["events_before"]
]
context["events_after"] = [
serialize_event(e, time_now)
for e in context["events_after"]
]
results = { results = {
e.event_id: { e.event_id: {
"rank": rank_map[e.event_id], "rank": rank_map[e.event_id],
"result": serialize_event(e, time_now) "result": serialize_event(e, time_now),
"context": contexts.get(e.event_id, {}),
} }
for e in allowed_events for e in allowed_events
} }

View file

@ -113,15 +113,20 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock() self.clock = hs.get_clock()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result. return an empty sync result.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if timeout == 0 or since_token is None:
result = yield self.current_sync_for_user(sync_config, since_token) if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(sync_config, since_token,
full_state=full_state)
defer.returnValue(result) defer.returnValue(result)
else: else:
def current_sync_callback(before_token, after_token): def current_sync_callback(before_token, after_token):
@ -146,19 +151,24 @@ class SyncHandler(BaseHandler):
) )
defer.returnValue(result) defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None): def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now. """Get the sync for client needed to match what the server has now.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if since_token is None: if since_token is None or full_state:
return self.initial_sync(sync_config) return self.full_state_sync(sync_config, since_token)
else: else:
return self.incremental_sync_with_gap(sync_config, since_token) return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync(self, sync_config): def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state """Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have
happened since that token will be returned.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
@ -192,8 +202,12 @@ class SyncHandler(BaseHandler):
archived = [] archived = []
for event in room_list: for event in room_list:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room( room_sync = yield self.full_state_sync_for_joined_room(
event.room_id, sync_config, now_token, typing_by_room room_id=event.room_id,
sync_config=sync_config,
now_token=now_token,
timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room
) )
joined.append(room_sync) joined.append(room_sync)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
@ -206,11 +220,12 @@ class SyncHandler(BaseHandler):
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
room_sync = yield self.initial_sync_for_archived_room( room_sync = yield self.full_state_sync_for_archived_room(
sync_config=sync_config, sync_config=sync_config,
room_id=event.room_id, room_id=event.room_id,
leave_event_id=event.event_id, leave_event_id=event.event_id,
leave_token=leave_token, leave_token=leave_token,
timeline_since_token=timeline_since_token,
) )
archived.append(room_sync) archived.append(room_sync)
@ -223,7 +238,8 @@ class SyncHandler(BaseHandler):
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync_for_joined_room(self, room_id, sync_config, now_token, def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
typing_by_room): typing_by_room):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
@ -231,7 +247,7 @@ class SyncHandler(BaseHandler):
""" """
batch = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, room_id, sync_config, now_token, since_token=timeline_since_token
) )
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(
@ -278,15 +294,16 @@ class SyncHandler(BaseHandler):
defer.returnValue((now_token, typing_by_room)) defer.returnValue((now_token, typing_by_room))
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync_for_archived_room(self, room_id, sync_config, def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token): leave_event_id, leave_token,
timeline_since_token):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred JoinedSyncResult. A Deferred JoinedSyncResult.
""" """
batch = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, leave_token, room_id, sync_config, leave_token, since_token=timeline_since_token
) )
leave_state = yield self.store.get_state_for_events( leave_state = yield self.store.get_state_for_events(
@ -370,7 +387,7 @@ class SyncHandler(BaseHandler):
else: else:
prev_batch = now_token prev_batch = now_token
state = yield self.check_joined_room( state, limited = yield self.check_joined_room(
sync_config, room_id, state sync_config, room_id, state
) )
@ -379,7 +396,7 @@ class SyncHandler(BaseHandler):
timeline=TimelineBatch( timeline=TimelineBatch(
events=recents, events=recents,
prev_batch=prev_batch, prev_batch=prev_batch,
limited=False, limited=limited,
), ),
state=state, state=state,
ephemeral=typing_by_room.get(room_id, []) ephemeral=typing_by_room.get(room_id, [])
@ -503,7 +520,7 @@ class SyncHandler(BaseHandler):
current_state=current_state_events, current_state=current_state_events,
) )
state_events_delta = yield self.check_joined_room( state_events_delta, _ = yield self.check_joined_room(
sync_config, room_id, state_events_delta sync_config, room_id, state_events_delta
) )
@ -610,6 +627,7 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def check_joined_room(self, sync_config, room_id, state_delta): def check_joined_room(self, sync_config, room_id, state_delta):
joined = False joined = False
limited = False
for event in state_delta: for event in state_delta:
if ( if (
event.type == EventTypes.Member event.type == EventTypes.Member
@ -621,5 +639,6 @@ class SyncHandler(BaseHandler):
if joined: if joined:
res = yield self.state_handler.get_current_state(room_id) res = yield self.state_handler.get_current_state(room_id)
state_delta = res.values() state_delta = res.values()
limited = True
defer.returnValue(state_delta) defer.returnValue((state_delta, limited))

View file

@ -397,6 +397,41 @@ class RoomTriggerBackfill(ClientV1RestServlet):
defer.returnValue((200, res)) defer.returnValue((200, res))
class RoomEventContext(ClientV1RestServlet):
PATTERN = client_path_pattern(
"/rooms/(?P<room_id>[^/]*)/context/(?P<event_id>[^/]*)$"
)
def __init__(self, hs):
super(RoomEventContext, self).__init__(hs)
self.clock = hs.get_clock()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
user, _ = yield self.auth.get_user_by_req(request)
limit = int(request.args.get("limit", [10])[0])
results = yield self.handlers.room_context_handler.get_event_context(
user, room_id, event_id, limit,
)
time_now = self.clock.time_msec()
results["events_before"] = [
serialize_event(event, time_now) for event in results["events_before"]
]
results["events_after"] = [
serialize_event(event, time_now) for event in results["events_after"]
]
results["state"] = [
serialize_event(event, time_now) for event in results["state"]
]
logger.info("Responding with %r", results)
defer.returnValue((200, results))
# TODO: Needs unit testing # TODO: Needs unit testing
class RoomMembershipRestServlet(ClientV1RestServlet): class RoomMembershipRestServlet(ClientV1RestServlet):
@ -628,3 +663,4 @@ def register_servlets(hs, http_server):
RoomRedactEventRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server)
RoomEventContext(hs).register(http_server)

View file

@ -16,7 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.http.servlet import ( from synapse.http.servlet import (
RestServlet, parse_string, parse_integer RestServlet, parse_string, parse_integer, parse_boolean
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken from synapse.types import StreamToken
@ -90,6 +90,7 @@ class SyncRestServlet(RestServlet):
allowed_values=self.ALLOWED_PRESENCE allowed_values=self.ALLOWED_PRESENCE
) )
filter_id = parse_string(request, "filter", default=None) filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)
logger.info( logger.info(
"/sync: user=%r, timeout=%r, since=%r," "/sync: user=%r, timeout=%r, since=%r,"
@ -120,7 +121,8 @@ class SyncRestServlet(RestServlet):
try: try:
sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout sync_config, since_token=since_token, timeout=timeout,
full_state=full_state
) )
finally: finally:
if set_presence == "online": if set_presence == "online":

View file

@ -23,7 +23,7 @@ paginate bacwards.
This is implemented by keeping two ordering columns: stream_ordering and This is implemented by keeping two ordering columns: stream_ordering and
topological_ordering. Stream ordering is basically insertion/received order topological_ordering. Stream ordering is basically insertion/received order
(except for events from backfill requests). The topolgical_ordering is a (except for events from backfill requests). The topological_ordering is a
weak ordering of events based on the pdu graph. weak ordering of events based on the pdu graph.
This means that we have to have two different types of tokens, depending on This means that we have to have two different types of tokens, depending on
@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore):
internal = event.internal_metadata internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1)) internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream)) internal.after = str(RoomStreamToken(topo, stream))
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
"""Retrieve events and pagination tokens around a given event in a
room.
Args:
room_id (str)
event_id (str)
before_limit (int)
after_limit (int)
Returns:
dict
"""
results = yield self.runInteraction(
"get_events_around", self._get_events_around_txn,
room_id, event_id, before_limit, after_limit
)
events_before = yield self._get_events(
[e for e in results["before"]["event_ids"]],
get_prev_content=True
)
events_after = yield self._get_events(
[e for e in results["after"]["event_ids"]],
get_prev_content=True
)
defer.returnValue({
"events_before": events_before,
"events_after": events_after,
"start": results["before"]["token"],
"end": results["after"]["token"],
})
def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
"""Retrieves event_ids and pagination tokens around a given event in a
room.
Args:
room_id (str)
event_id (str)
before_limit (int)
after_limit (int)
Returns:
dict
"""
results = self._simple_select_one_txn(
txn,
"events",
keyvalues={
"event_id": event_id,
"room_id": room_id,
},
retcols=["stream_ordering", "topological_ordering"],
)
stream_ordering = results["stream_ordering"]
topological_ordering = results["topological_ordering"]
query_before = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND (topological_ordering < ?"
" OR (topological_ordering = ? AND stream_ordering < ?))"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
query_after = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" WHERE room_id = ? AND (topological_ordering > ?"
" OR (topological_ordering = ? AND stream_ordering > ?))"
" ORDER BY topological_ordering ASC, stream_ordering ASC"
" LIMIT ?"
)
txn.execute(
query_before,
(
room_id, topological_ordering, topological_ordering,
stream_ordering, before_limit,
)
)
rows = self.cursor_to_dict(txn)
events_before = [r["event_id"] for r in rows]
if rows:
start_token = str(RoomStreamToken(
rows[0]["topological_ordering"],
rows[0]["stream_ordering"] - 1,
))
else:
start_token = str(RoomStreamToken(
topological_ordering,
stream_ordering - 1,
))
txn.execute(
query_after,
(
room_id, topological_ordering, topological_ordering,
stream_ordering, after_limit,
)
)
rows = self.cursor_to_dict(txn)
events_after = [r["event_id"] for r in rows]
if rows:
end_token = str(RoomStreamToken(
rows[-1]["topological_ordering"],
rows[-1]["stream_ordering"],
))
else:
end_token = str(RoomStreamToken(
topological_ordering,
stream_ordering,
))
return {
"before": {
"event_ids": events_before,
"token": start_token,
},
"after": {
"event_ids": events_after,
"token": end_token,
},
}

View file

@ -47,7 +47,7 @@ class DomainSpecificString(
@classmethod @classmethod
def from_string(cls, s): def from_string(cls, s):
"""Parse the string given by 's' into a structure object.""" """Parse the string given by 's' into a structure object."""
if s[0] != cls.SIGIL: if len(s) < 1 or s[0] != cls.SIGIL:
raise SynapseError(400, "Expected %s string to start with '%s'" % ( raise SynapseError(400, "Expected %s string to start with '%s'" % (
cls.__name__, cls.SIGIL, cls.__name__, cls.SIGIL,
)) ))

View file

@ -15,13 +15,14 @@
from tests import unittest from tests import unittest
from synapse.api.errors import SynapseError
from synapse.server import BaseHomeServer from synapse.server import BaseHomeServer
from synapse.types import UserID, RoomAlias from synapse.types import UserID, RoomAlias
mock_homeserver = BaseHomeServer(hostname="my.domain") mock_homeserver = BaseHomeServer(hostname="my.domain")
class UserIDTestCase(unittest.TestCase):
class UserIDTestCase(unittest.TestCase):
def test_parse(self): def test_parse(self):
user = UserID.from_string("@1234abcd:my.domain") user = UserID.from_string("@1234abcd:my.domain")
@ -29,6 +30,11 @@ class UserIDTestCase(unittest.TestCase):
self.assertEquals("my.domain", user.domain) self.assertEquals("my.domain", user.domain)
self.assertEquals(True, mock_homeserver.is_mine(user)) self.assertEquals(True, mock_homeserver.is_mine(user))
def test_pase_empty(self):
with self.assertRaises(SynapseError):
UserID.from_string("")
def test_build(self): def test_build(self):
user = UserID("5678efgh", "my.domain") user = UserID("5678efgh", "my.domain")
@ -44,7 +50,6 @@ class UserIDTestCase(unittest.TestCase):
class RoomAliasTestCase(unittest.TestCase): class RoomAliasTestCase(unittest.TestCase):
def test_parse(self): def test_parse(self):
room = RoomAlias.from_string("#channel:my.domain") room = RoomAlias.from_string("#channel:my.domain")

View file

@ -19,6 +19,7 @@ commands =
check-manifest check-manifest
[testenv:pep8] [testenv:pep8]
skip_install = True
basepython = python2.7 basepython = python2.7
deps = deps =
flake8 flake8