forked from MirrorHub/synapse
Add filter param to /messages API
This commit is contained in:
parent
d554ca5e1d
commit
a98d215204
4 changed files with 29 additions and 11 deletions
|
@ -66,7 +66,7 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True):
|
||||
as_client_event=True, event_filter=None):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
|
@ -75,11 +75,11 @@ class MessageHandler(BaseHandler):
|
|||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
event_filter (Filter): Filter to apply to results or None
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
data_source = self.hs.get_event_sources().sources["room"]
|
||||
|
||||
if pagin_config.from_token:
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
@ -129,8 +129,13 @@ class MessageHandler(BaseHandler):
|
|||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield data_source.get_pagination_rows(
|
||||
requester.user, source_config, room_id
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
direction=source_config.direction,
|
||||
limit=source_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
|
@ -144,6 +149,9 @@ class MessageHandler(BaseHandler):
|
|||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
if event_filter:
|
||||
events = event_filter.filter(events)
|
||||
|
||||
events = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
|
|
|
@ -20,12 +20,14 @@ from .base import ClientV1RestServlet, client_path_patterns
|
|||
from synapse.api.errors import SynapseError, Codes, AuthError
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.types import UserID, RoomID, RoomAlias
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
import ujson as json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -327,12 +329,19 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||
request, default_limit=10,
|
||||
)
|
||||
as_client_event = "raw" not in request.args
|
||||
filter_bytes = request.args.get("filter", None)
|
||||
if filter_bytes:
|
||||
filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
|
||||
event_filter = Filter(json.loads(filter_json))
|
||||
else:
|
||||
event_filter = None
|
||||
handler = self.handlers.message_handler
|
||||
msgs = yield handler.get_messages(
|
||||
room_id=room_id,
|
||||
requester=requester,
|
||||
pagin_config=pagination_config,
|
||||
as_client_event=as_client_event
|
||||
as_client_event=as_client_event,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
defer.returnValue((200, msgs))
|
||||
|
|
|
@ -30,6 +30,7 @@ class EventInjector:
|
|||
def create_room(self, room):
|
||||
builder = self.event_builder_factory.new({
|
||||
"type": EventTypes.Create,
|
||||
"sender": "",
|
||||
"room_id": room.to_string(),
|
||||
"content": {},
|
||||
})
|
||||
|
|
|
@ -37,7 +37,7 @@ class EventsStoreTestCase(unittest.TestCase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def test_count_daily_messages(self):
|
||||
self.db_pool.runQuery("DELETE FROM stats_reporting")
|
||||
yield self.db_pool.runQuery("DELETE FROM stats_reporting")
|
||||
|
||||
self.hs.clock.now = 100
|
||||
|
||||
|
@ -60,7 +60,7 @@ class EventsStoreTestCase(unittest.TestCase):
|
|||
# it isn't old enough.
|
||||
count = yield self.store.count_daily_messages()
|
||||
self.assertIsNone(count)
|
||||
self._assert_stats_reporting(1, self.hs.clock.now)
|
||||
yield self._assert_stats_reporting(1, self.hs.clock.now)
|
||||
|
||||
# Already reported yesterday, two new events from today.
|
||||
yield self.event_injector.inject_message(room, user, "Yeah they are!")
|
||||
|
@ -68,21 +68,21 @@ class EventsStoreTestCase(unittest.TestCase):
|
|||
self.hs.clock.now += 60 * 60 * 24
|
||||
count = yield self.store.count_daily_messages()
|
||||
self.assertEqual(2, count) # 2 since yesterday
|
||||
self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever
|
||||
yield self._assert_stats_reporting(3, self.hs.clock.now) # 3 ever
|
||||
|
||||
# Last reported too recently.
|
||||
yield self.event_injector.inject_message(room, user, "Who could disagree?")
|
||||
self.hs.clock.now += 60 * 60 * 22
|
||||
count = yield self.store.count_daily_messages()
|
||||
self.assertIsNone(count)
|
||||
self._assert_stats_reporting(4, self.hs.clock.now)
|
||||
yield self._assert_stats_reporting(4, self.hs.clock.now)
|
||||
|
||||
# Last reported too long ago
|
||||
yield self.event_injector.inject_message(room, user, "No one.")
|
||||
self.hs.clock.now += 60 * 60 * 26
|
||||
count = yield self.store.count_daily_messages()
|
||||
self.assertIsNone(count)
|
||||
self._assert_stats_reporting(5, self.hs.clock.now)
|
||||
yield self._assert_stats_reporting(5, self.hs.clock.now)
|
||||
|
||||
# And now let's actually report something
|
||||
yield self.event_injector.inject_message(room, user, "Indeed.")
|
||||
|
@ -92,7 +92,7 @@ class EventsStoreTestCase(unittest.TestCase):
|
|||
self.hs.clock.now += (60 * 60 * 24) + 50
|
||||
count = yield self.store.count_daily_messages()
|
||||
self.assertEqual(3, count)
|
||||
self._assert_stats_reporting(8, self.hs.clock.now)
|
||||
yield self._assert_stats_reporting(8, self.hs.clock.now)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_last_stream_token(self):
|
||||
|
|
Loading…
Reference in a new issue