WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.

This commit is contained in:
Erik Johnston 2014-08-26 18:57:46 +01:00
parent 8885c8546c
commit 3a2a5b959c
16 changed files with 432 additions and 650 deletions

7
.gitignore vendored
View file

@ -17,3 +17,10 @@ htmlcov
demo/*.db
demo/*.log
demo/*.pid
graph/*.svg
graph/*.png
graph/*.dot
uploads

View file

@ -1,196 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
import logging
logger = logging.getLogger(__name__)
class Notifier(object):
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.stored_event_listeners = {}
@defer.inlineCallbacks
def on_new_room_event(self, event, store_id):
"""Called when there is a new room event which may potentially be sent
down listening users' event streams.
This function looks for interested *users* who may want to be notified
for this event. This is different to users requesting from the event
stream which looks for interested *events* for this user.
Args:
event (SynapseEvent): The new event, which must have a room_id
store_id (int): The ID of this event after it was stored with the
data store.
'"""
member_list = yield self.store.get_room_members(room_id=event.room_id,
membership="join")
if not member_list:
member_list = []
member_list = [u.user_id for u in member_list]
# invites MUST prod the person being invited, who won't be in the room.
if (event.type == RoomMemberEvent.TYPE and
event.content["membership"] == Membership.INVITE):
member_list.append(event.state_key)
# similarly, LEAVEs must be sent to the person leaving
if (event.type == RoomMemberEvent.TYPE and
event.content["membership"] == Membership.LEAVE):
member_list.append(event.state_key)
for user_id in member_list:
if user_id in self.stored_event_listeners:
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
if user_id in self.stored_event_listeners:
self._notify_and_callback(
user_id=user_id,
event_data=event_data,
stream_type=stream_type,
store_id=store_id
)
def _notify_and_callback(self, user_id, event_data, stream_type, store_id):
logger.debug(
"Notifying %s of a new event.",
user_id
)
stream_ids = list(self.stored_event_listeners[user_id])
for stream_id in stream_ids:
self._notify_and_callback_stream(user_id, stream_id, event_data,
stream_type, store_id)
if not self.stored_event_listeners[user_id]:
del self.stored_event_listeners[user_id]
def _notify_and_callback_stream(self, user_id, stream_id, event_data,
stream_type, store_id):
event_listener = self.stored_event_listeners[user_id].pop(stream_id)
return_event_object = {
k: event_listener[k] for k in ["start", "chunk", "end"]
}
# work out the new end token
token = event_listener["start"]
end = self._next_token(stream_type, store_id, token)
return_event_object["end"] = end
# add the event to the chunk
chunk = event_listener["chunk"]
chunk.append(event_data)
# callback the defer. We know this can't have been resolved before as
# we always remove the event_listener from the map before resolving.
event_listener["defer"].callback(return_event_object)
def _next_token(self, stream_type, store_id, current_token):
stream_handler = self.hs.get_handlers().event_stream_handler
return stream_handler.get_event_stream_token(
stream_type,
store_id,
current_token
)
def store_events_for(self, user_id=None, stream_id=None, from_tok=None):
"""Store all incoming events for this user. This should be paired with
get_events_for to return chunked data.
Args:
user_id (str): The user to monitor incoming events for.
stream (object): The stream that is receiving events
from_tok (str): The token to monitor incoming events from.
"""
event_listener = {
"start": from_tok,
"chunk": [],
"end": from_tok,
"defer": defer.Deferred(),
}
if user_id not in self.stored_event_listeners:
self.stored_event_listeners[user_id] = {stream_id: event_listener}
else:
self.stored_event_listeners[user_id][stream_id] = event_listener
def purge_events_for(self, user_id=None, stream_id=None):
"""Purges any stored events for this user.
Args:
user_id (str): The user to purge stored events for.
"""
try:
del self.stored_event_listeners[user_id][stream_id]
if not self.stored_event_listeners[user_id]:
del self.stored_event_listeners[user_id]
except KeyError:
pass
def get_events_for(self, user_id=None, stream_id=None, timeout=0):
"""Retrieve stored events for this user, waiting if necessary.
It is advisable to wrap this call in a maybeDeferred.
Args:
user_id (str): The user to get events for.
timeout (int): The time in seconds to wait before giving up.
Returns:
A Deferred or a dict containing the chunk data, depending on if
there was data to return yet. The Deferred callback may be None if
there were no events before the timeout expired.
"""
logger.debug("%s is listening for events.", user_id)
try:
streams = self.stored_event_listeners[user_id][stream_id]["chunk"]
if streams:
logger.debug("%s returning existing chunk.", user_id)
return streams
except KeyError:
return None
reactor.callLater(
(timeout / 1000.0), self._timeout, user_id, stream_id
)
return self.stored_event_listeners[user_id][stream_id]["defer"]
def _timeout(self, user_id, stream_id):
try:
# We remove the event_listener from the map so that we can't
# resolve the deferred twice.
event_listeners = self.stored_event_listeners[user_id]
event_listener = event_listeners.pop(stream_id)
event_listener["defer"].callback(None)
logger.debug("%s event listening timed out.", user_id)
except KeyError:
pass

View file

@ -1,194 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains classes for streaming from the event stream: /events.
"""
from twisted.internet import defer
from synapse.api.errors import EventStreamError
from synapse.api.events import SynapseEvent
from synapse.api.streams import PaginationStream, StreamData
import logging
logger = logging.getLogger(__name__)
class EventsStreamData(StreamData):
EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit, direction):
data, latest_ver = yield self.store.get_room_events(
user_id=user_id,
from_key=from_key,
to_key=to_key,
limit=limit,
room_id=self.room_id,
with_feedback=self.with_feedback
)
defer.returnValue((data, latest_ver))
@defer.inlineCallbacks
def max_token(self):
val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
class EventStream(PaginationStream):
SEPARATOR = '_'
def __init__(self, user_id, stream_data_list):
super(EventStream, self).__init__()
self.user_id = user_id
self.stream_data = stream_data_list
@defer.inlineCallbacks
def fix_tokens(self, pagination_config):
pagination_config.from_tok = yield self.fix_token(
pagination_config.from_tok)
pagination_config.to_tok = yield self.fix_token(
pagination_config.to_tok)
if (
not pagination_config.to_tok
and pagination_config.direction == 'f'
):
pagination_config.to_tok = yield self.get_current_max_token()
logger.debug("pagination_config: %s", pagination_config)
defer.returnValue(pagination_config)
@defer.inlineCallbacks
def fix_token(self, token):
"""Fixes unknown values in a token to known values.
Args:
token (str): The token to fix up.
Returns:
The fixed-up token, which may == token.
"""
if token == PaginationStream.TOK_END:
new_token = yield self.get_current_max_token()
logger.debug("fix_token: From %s to %s", token, new_token)
token = new_token
defer.returnValue(token)
@defer.inlineCallbacks
def get_current_max_token(self):
new_token_parts = []
for s in self.stream_data:
mx = yield s.max_token()
new_token_parts.append(str(mx))
new_token = EventStream.SEPARATOR.join(new_token_parts)
logger.debug("get_current_max_token: %s", new_token)
defer.returnValue(new_token)
@defer.inlineCallbacks
def get_chunk(self, config):
# no support for limit on >1 streams, makes no sense.
if config.limit and len(self.stream_data) > 1:
raise EventStreamError(
400, "Limit not supported on multiplexed streams."
)
chunk_data, next_tok = yield self._get_chunk_data(
config.from_tok,
config.to_tok,
config.limit,
config.direction,
)
defer.returnValue({
"chunk": chunk_data,
"start": config.from_tok,
"end": next_tok
})
@defer.inlineCallbacks
def _get_chunk_data(self, from_tok, to_tok, limit, direction):
""" Get event data between the two tokens.
Tokens are SEPARATOR separated values representing pkey values of
certain tables, and the position determines the StreamData invoked
according to the STREAM_DATA list.
The magic value '-1' can be used to get the latest value.
Args:
from_tok - The token to start from.
to_tok - The token to end at. Must have values > from_tok or be -1.
Returns:
A list of event data.
Raises:
EventStreamError if something went wrong.
"""
# sanity check
if to_tok is not None:
if (from_tok.count(EventStream.SEPARATOR) !=
to_tok.count(EventStream.SEPARATOR) or
(from_tok.count(EventStream.SEPARATOR) + 1) !=
len(self.stream_data)):
raise EventStreamError(400, "Token lengths don't match.")
chunk = []
next_ver = []
for i, (from_pkey, to_pkey) in enumerate(zip(
self._split_token(from_tok),
self._split_token(to_tok)
)):
if from_pkey == to_pkey:
# tokens are the same, we have nothing to do.
next_ver.append(str(to_pkey))
continue
(event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
self.user_id, from_pkey, to_pkey, limit, direction,
)
chunk.extend([
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in event_chunk
])
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
def _split_token(self, token):
"""Splits the given token into a list of pkeys.
Args:
token (str): The token with SEPARATOR values.
Returns:
A list of ints.
"""
if token:
segments = token.split(EventStream.SEPARATOR)
else:
segments = [None] * len(self.stream_data)
return segments

View file

@ -16,19 +16,15 @@
from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.streams.event import (
EventStream, EventsStreamData
)
from synapse.handlers.presence import PresenceStreamData
import logging
logger = logging.getLogger(__name__)
class EventStreamHandler(BaseHandler):
stream_data_classes = [
EventsStreamData,
PresenceStreamData,
]
def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs)
@ -43,104 +39,22 @@ class EventStreamHandler(BaseHandler):
self.clock = hs.get_clock()
def get_event_stream_token(self, stream_type, store_id, start_token):
"""Return the next token after this event.
Args:
stream_type (str): The StreamData.EVENT_TYPE
store_id (int): The new storage ID assigned from the data store.
start_token (str): The token the user started with.
Returns:
str: The end token.
"""
for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
if stream_cls.EVENT_TYPE == stream_type:
# this is the stream for this event, so replace this part of
# the token
store_ids = start_token.split(EventStream.SEPARATOR)
store_ids[i] = str(store_id)
return EventStream.SEPARATOR.join(store_ids)
raise RuntimeError("Didn't find a stream type %s" % stream_type)
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def get_stream(self, auth_user_id, pagin_config, timeout=0):
"""Gets events as an event stream for this user.
This function looks for interesting *events* for this user. This is
different from the notifier, which looks for interested *users* who may
want to know about a single event.
Args:
auth_user_id (str): The user requesting their event stream.
pagin_config (synapse.api.streams.PaginationConfig): The config to
use when obtaining the stream.
timeout (int): The max time to wait for an incoming event in ms.
Returns:
A pagination stream API dict
"""
auth_user = self.hs.parse_userid(auth_user_id)
stream_id = object()
if pagin_config.from_token is None:
pagin_config.from_token = None
try:
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user))
else:
self.distributor.fire(
"started_user_eventstream", auth_user
)
self._streams_per_user[auth_user] += 1
events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout)
# construct an event stream with the correct data ordering
stream_data_list = []
for stream_class in EventStreamHandler.stream_data_classes:
stream_data_list.append(stream_class(self.hs))
event_stream = EventStream(auth_user_id, stream_data_list)
chunk = {
"chunk": [e.get_dict() for e in events],
"start_token": tokens[0].to_string(),
"end_token": tokens[1].to_string(),
}
# fix unknown tokens to known tokens
pagin_config = yield event_stream.fix_tokens(pagin_config)
defer.returnValue(chunk)
# register interest in receiving new events
self.notifier.store_events_for(user_id=auth_user_id,
stream_id=stream_id,
from_tok=pagin_config.from_tok)
# see if we can grab a chunk now
data_chunk = yield event_stream.get_chunk(config=pagin_config)
# if there are previous events, return those. If not, wait on the
# new events for 'timeout' seconds.
if len(data_chunk["chunk"]) == 0 and timeout != 0:
results = yield defer.maybeDeferred(
self.notifier.get_events_for,
user_id=auth_user_id,
stream_id=stream_id,
timeout=timeout
)
if results:
defer.returnValue(results)
defer.returnValue(data_chunk)
finally:
# cleanup
self.notifier.purge_events_for(user_id=auth_user_id,
stream_id=stream_id)
self._streams_per_user[auth_user] -= 1
if not self._streams_per_user[auth_user]:
del self._streams_per_user[auth_user]
# 10 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
self.distributor.fire(
"stopped_user_eventstream", auth_user
)
del self._stop_timer_per_user[auth_user]
self._stop_timer_per_user[auth_user] = (
self.clock.call_later(5, _later)
)

View file

@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
from synapse.api.streams import StreamData
from ._base import BaseHandler
@ -682,41 +681,10 @@ class PresenceHandler(BaseHandler):
user=observed_user,
clock=self.clock
),
stream_type=PresenceStreamData,
store_id=statuscache.serial
)
class PresenceStreamData(StreamData):
def __init__(self, hs):
super(PresenceStreamData, self).__init__(hs)
self.presence = hs.get_handlers().presence_handler
def get_rows(self, user_id, from_key, to_key, limit, direction):
from_key = int(from_key)
to_key = int(to_key)
cachemap = self.presence._user_cachemap
# TODO(paul): limit, and filter by visibility
updates = [(k, cachemap[k]) for k in cachemap
if from_key < cachemap[k].serial <= to_key]
if updates:
clock = self.presence.clock
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
return ((data, latest_serial))
else:
return (([], self.presence._user_cachemap_latest_serial))
def max_token(self):
return self.presence._user_cachemap_latest_serial
PresenceStreamData.EVENT_TYPE = PresenceStreamData
class UserPresenceCache(object):
"""Store an observed user's state and status message.

View file

@ -22,8 +22,6 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
from synapse.api.events.room import (
RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
)
from synapse.api.streams.event import EventStream, EventsStreamData
from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils
from ._base import BaseHandler
@ -115,13 +113,24 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
data_source = [
EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
]
event_stream = EventStream(user_id, data_source)
pagin_config = yield event_stream.fix_tokens(pagin_config)
data_chunk = yield event_stream.get_chunk(config=pagin_config)
defer.returnValue(data_chunk)
data_source = self.hs.get_event_sources().sources[0]
if pagin_config.from_token:
from_token = pagin_config.from_token
else:
from_token = yield self.hs.get_event_sources().get_current_token()
events, next_token = yield data_source.get_pagination_rows(
from_token, pagin_config.to_token, pagin_config.limit, room_id
)
chunk = {
"chunk": [e.get_dict() for e in events],
"start_token": from_token.to_string(),
"end_token": next_token.to_string(),
}
defer.returnValue(chunk)
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
@ -258,18 +267,15 @@ class MessageHandler(BaseHandler):
rooms_ret = []
now_rooms_token = yield self.store.get_room_events_max_id()
# FIXME (erikj): We need to not generate this token,
now_token = yield self.hs.get_event_sources().get_current_token()
# FIXME (erikj): Fix this.
presence_stream = PresenceStreamData(self.hs)
now_presence_token = yield presence_stream.max_token()
presence = yield presence_stream.get_rows(
user_id, 0, now_presence_token, None, None
presence_stream = self.hs.get_event_sources().sources[1]
presence = yield presence_stream.get_new_events_for_user(
user_id, now_token, None, None
)
# FIXME (erikj): We need to not generate this token,
now_token = "%s_%s" % (now_rooms_token, now_presence_token)
limit = pagin_config.limit
if not limit:
limit = 10
@ -291,7 +297,7 @@ class MessageHandler(BaseHandler):
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
end_token=now_rooms_token,
end_token=now_token.events_key.to_string(),
)
d["messages"] = {
@ -305,9 +311,7 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
# logger.debug("snapshot_all_rooms returning: %s", ret)
ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()}
defer.returnValue(ret)

184
synapse/notifier.py Normal file
View file

@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from synapse.util.logutils import log_function
import logging
logger = logging.getLogger(__name__)
class _NotificationListener(object):
def __init__(self, user, from_token, limit, timeout, deferred):
self.user = user
self.from_token = from_token
self.limit = limit
self.timeout = timeout
self.deferred = deferred
self.signal_key_list = []
self.pending_notifications = []
def notify(self, notifier, events, start_token, end_token):
result = (events, (start_token, end_token))
try:
self.deferred.callback(result)
except defer.AlreadyCalledError:
pass
for signal, key in self.signal_key_list:
lst = notifier.signal_keys_to_users.get((signal, key), [])
try:
lst.remove(self)
except:
pass
class Notifier(object):
def __init__(self, hs):
self.hs = hs
self.signal_keys_to_users = {}
self.event_sources = hs.get_event_sources()
@log_function
@defer.inlineCallbacks
def on_new_room_event(self, event, store_id):
room_id = event.room_id
source = self.event_sources.sources[0]
listeners = self.signal_keys_to_users.get(
(source.SIGNAL_NAME, room_id),
[]
)
logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners)
logger.debug("on_new_room_event listeners %s", listeners)
# TODO (erikj): Can we make this more efficient by hitting the
# db once?
for listener in listeners:
events, end_token = yield source.get_new_events_for_user(
listener.user,
listener.from_token,
listener.limit,
key=room_id,
)
if events:
listener.notify(
self, events, listener.from_token, end_token
)
def on_new_user_event(self, *args, **kwargs):
pass
def get_events_for(self, user, pagination_config, timeout):
deferred = defer.Deferred()
self._get_events(
deferred, user, pagination_config.from_token,
pagination_config.limit, timeout
).addErrback(deferred.errback)
return deferred
@defer.inlineCallbacks
def _get_events(self, deferred, user, from_token, limit, timeout):
if not from_token:
from_token = yield self.event_sources.get_current_token()
listener = _NotificationListener(
user,
from_token,
limit,
timeout,
deferred,
)
if timeout:
reactor.callLater(timeout/1000, self._timeout_listener, listener)
yield self._register_with_keys(listener)
yield self._check_for_updates(listener)
return
def _timeout_listener(self, listener):
# TODO (erikj): We should probably set to_token to the current max
# rather than reusing from_token.
listener.notify(
self,
[],
listener.from_token,
listener.from_token,
)
@defer.inlineCallbacks
@log_function
def _register_with_keys(self, listener):
signals_keys = {}
# TODO (erikj): This can probably be replaced by a DeferredList
for source in self.event_sources.sources:
keys = yield source.get_keys_for_user(listener.user)
signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys)
for signal, keys in signals_keys.items():
for key in keys:
s = self.signal_keys_to_users.setdefault((signal, key), [])
s.append(listener)
listener.signal_key_list.append((signal, key))
logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users)
defer.returnValue(listener)
@defer.inlineCallbacks
@log_function
def _check_for_updates(self, listener):
# TODO (erikj): We need to think about limits across multiple sources
events = []
from_token = listener.from_token
limit = listener.limit
# TODO (erikj): DeferredList?
for source in self.event_sources.sources:
stuff, new_token = yield source.get_new_events_for_user(
listener.user,
from_token,
limit,
)
events.extend(stuff)
from_token = new_token
end_token = from_token
if events:
listener.notify(self, events, listener.from_token, end_token)
defer.returnValue(listener)

View file

@ -17,7 +17,7 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.api.streams import PaginationConfig
from synapse.streams.config import PaginationConfig
from synapse.rest.base import RestServlet, client_path_pattern
@ -41,11 +41,13 @@ class EventStreamRestServlet(RestServlet):
chunk = yield handler.get_stream(auth_user.to_string(), pagin_config,
timeout=timeout)
defer.returnValue((200, chunk))
def on_OPTIONS(self, request):
return (200, {})
def register_servlets(hs, http_server):
EventStreamRestServlet(hs).register(http_server)

View file

@ -15,7 +15,7 @@
from twisted.internet import defer
from synapse.api.streams import PaginationConfig
from synapse.streams.config import PaginationConfig
from base import RestServlet, client_path_pattern

View file

@ -22,7 +22,7 @@ from synapse.api.events.room import (
MessageEvent, RoomMemberEvent, FeedbackEvent
)
from synapse.api.constants import Feedback
from synapse.api.streams import PaginationConfig
from synapse.streams.config import PaginationConfig
import json
import logging

View file

@ -22,7 +22,7 @@
from synapse.federation import initialize_http_replication
from synapse.federation.handler import FederationEventHandler
from synapse.api.events.factory import EventFactory
from synapse.api.notifier import Notifier
from synapse.notifier import Notifier
from synapse.api.auth import Auth
from synapse.handlers import Handlers
from synapse.rest import RestServletFactory
@ -32,6 +32,7 @@ from synapse.types import UserID, RoomAlias
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.lockutils import LockManager
from synapse.streams.events import EventSources
class BaseHomeServer(object):
@ -73,6 +74,7 @@ class BaseHomeServer(object):
'resource_for_federation',
'resource_for_web_client',
'resource_for_content_repo',
'event_sources',
]
def __init__(self, hostname, **kwargs):
@ -182,6 +184,9 @@ class HomeServer(BaseHomeServer):
def build_distributor(self):
return Distributor()
def build_event_sources(self):
return EventSources(self)
def register_servlets(self):
""" Register all servlets associated with this HomeServer.
"""

View file

@ -174,7 +174,7 @@ class StreamStore(SQLBaseStore):
"SELECT * FROM events as e WHERE "
"((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering < ? "
"AND e.stream_ordering > ? AND e.stream_ordering <= ? "
"AND e.outlier = 0 "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
@ -293,5 +293,5 @@ class StreamStore(SQLBaseStore):
defer.returnValue("s1")
return
key = res[0]["m"] + 1
key = res[0]["m"]
defer.returnValue("s%d" % (key,))

View file

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -16,21 +16,25 @@
from synapse.api.errors import SynapseError
from synapse.types import StreamToken
import logging
logger = logging.getLogger(__name__)
class PaginationConfig(object):
"""A configuration object which stores pagination parameters."""
def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
self.from_tok = StreamToken(from_tok) if from_tok else None
self.to_tok = StreamToken(to_tok) if to_tok else None
self.from_token = StreamToken.from_string(from_tok) if from_tok else None
self.to_token = StreamToken.from_string(to_tok) if to_tok else None
self.direction = 'f' if direction == 'f' else 'b'
self.limit = int(limit)
@classmethod
def from_request(cls, request, raise_invalid_params=True):
params = {
"from_tok": "END",
"direction": 'f',
}
@ -48,9 +52,14 @@ class PaginationConfig(object):
elif raise_invalid_params:
raise SynapseError(400, "%s parameter is invalid." % qp)
if "from_tok" in params and params["from_tok"] == "END":
# TODO (erikj): This is for compatibility only.
del params["from_tok"]
try:
return PaginationConfig(**params)
except:
logger.exception("Failed to create pagination config")
raise SynapseError(400, "Invalid request.")
def __str__(self):
@ -60,48 +69,4 @@ class PaginationConfig(object):
) % (self.from_tok, self.to_tok, self.direction, self.limit)
class PaginationStream(object):
""" An interface for streaming data as chunks. """
TOK_END = "END"
def get_chunk(self, config=None):
""" Return the next chunk in the stream.
Args:
config (PaginationConfig): The config to aid which chunk to get.
Returns:
A dict containing the new start token "start", the new end token
"end" and the data "chunk" as a list.
"""
raise NotImplementedError()
class StreamData(object):
""" An interface for obtaining streaming data from a table. """
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
def get_rows(self, user_id, from_pkey, to_pkey, limit, direction):
""" Get event stream data between the specified pkeys.
Args:
user_id : The user's ID
from_pkey : The starting pkey.
to_pkey : The end pkey. May be -1 to mean "latest".
limit: The max number of results to return.
Returns:
A tuple containing the list of event stream data and the last pkey.
"""
raise NotImplementedError()
def max_token(self):
""" Get the latest currently-valid token.
Returns:
The latest token."""
raise NotImplementedError()

149
synapse/streams/events.py Normal file
View file

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.types import StreamToken
class RoomEventSource(object):
SIGNAL_NAME = "EventStreamSourceSignal"
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_keys_for_user(self, user):
events = yield self.store.get_rooms_for_user_where_membership_is(
user.to_string(),
(Membership.JOIN,),
)
defer.returnValue(set([e.room_id for e in events]))
@defer.inlineCallbacks
def get_new_events_for_user(self, user, from_token, limit, key=None):
# We just ignore the key for now.
to_key = yield self.get_current_token_part()
events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(),
from_key=from_token.events_key,
to_key=to_key,
room_id=None,
limit=limit,
)
end_token = from_token.copy_and_replace("events_key", end_key)
defer.returnValue((events, end_token))
def get_current_token_part(self):
return self.store.get_room_events_max_id()
@defer.inlineCallbacks
def get_pagination_rows(self, from_token, to_token, limit, key):
to_key = to_token.events_key if to_token else None
events, next_key = yield self.store.paginate_room_events(
room_id=key,
from_key=from_token.events_key,
to_key=to_key,
direction='b',
limit=limit,
with_feedback=True
)
next_token = from_token.copy_and_replace("events_key", next_key)
defer.returnValue((events, next_token))
class PresenceStreamSource(object):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
def get_new_events_for_user(self, user, from_token, limit, key=None):
from_key = int(from_token.presence_key)
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
# TODO(paul): limit, and filter by visibility
updates = [(k, cachemap[k]) for k in cachemap
if from_key < cachemap[k].serial]
if updates:
clock = self.clock
latest_serial = max([x[1].serial for x in updates])
data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
end_token = from_token.copy_and_replace(
"presence_key", latest_serial
)
return ((data, end_token))
else:
end_token = from_token.copy_and_replace(
"presence_key", presence._user_cachemap_latest_serial
)
return (([], end_token))
def get_keys_for_user(self, user):
raise NotImplementedError("get_keys_for_user")
def get_current_token_part(self):
presence = self.hs.get_handlers().presence_handler
return presence._user_cachemap_latest_serial
class EventSources(object):
SOURCE_TYPES = [
RoomEventSource,
PresenceStreamSource,
]
def __init__(self, hs):
self.sources = [t(hs) for t in EventSources.SOURCE_TYPES]
@staticmethod
def create_token(events_key, presence_key):
return StreamToken(events_key=events_key, presence_key=presence_key)
@defer.inlineCallbacks
def get_current_token(self):
events_key = yield self.sources[0].get_current_token_part()
token = EventSources.create_token(events_key, "0")
defer.returnValue(token)
class StreamSource(object):
def get_keys_for_user(self, user):
raise NotImplementedError("get_keys_for_user")
def get_new_events_for_user(self, user, from_token, limit, key=None):
raise NotImplementedError("get_new_events_for_user")
def get_current_token_part(self):
raise NotImplementedError("get_current_token_part")
class PaginationSource(object):
def get_pagination_rows(self, from_token, to_token, limit, key):
raise NotImplementedError("get_rows")

View file

@ -97,72 +97,32 @@ class RoomID(DomainSpecificString):
class StreamToken(
namedtuple(
"Token",
("events_type", "topological_key", "stream_key", "presence_key")
("events_key", "presence_key")
)
):
_SEPARATOR = "_"
_TOPOLOGICAL_PREFIX = "t"
_STREAM_PREFIX = "s"
_TOPOLOGICAL_SEPERATOR = "-"
TOPOLOGICAL_TYPE = "topo"
STREAM_TYPE = "stream"
@classmethod
def from_string(cls, string):
try:
events_part, presence_part = string.split(cls._SEPARATOR)
presence_key = int(presence_part)
topo_length = len(cls._TOPOLOGICAL_PREFIX)
stream_length = len(cls._STREAM_PREFIX)
if events_part[:topo_length] == cls._TOPOLOGICAL_PREFIX:
# topological event token
topo_tok = events_part[topo_length:]
topo_key, stream_key = topo_tok.split(
cls._TOPOLOGICAL_SEPERATOR, 1
)
topo_key = int(topo_key)
stream_key = int(stream_key)
events_type = cls.TOPOLOGICAL_TYPE
elif events_part[:stream_length] == cls._STREAM_PREFIX:
topo_key = None
stream_key = int(events_part[stream_length:])
events_type = cls.STREAM_TYPE
else:
raise
events_key, presence_key = string.split(cls._SEPARATOR)
return cls(
events_type=events_type,
topological_key=topo_key,
stream_key=stream_key,
events_key=events_key,
presence_key=presence_key,
)
except:
raise SynapseError(400, "Invalid Token")
def to_string(self):
if self.events_type == self.TOPOLOGICAL_TYPE:
return "".join([
self._TOPOLOGICAL_PREFIX,
str(self.topological_key),
self._TOPOLOGICAL_SEPERATOR,
str(self.stream_key),
self._SEPARATOR,
str(self.presence_key),
])
elif self.events_type == self.STREAM_TYPE:
return "".join([
self._STREAM_PREFIX,
str(self.stream_key),
self._SEPARATOR,
str(self.presence_key),
])
return "".join([
str(self.events_key),
self._SEPARATOR,
str(self.presence_key),
])
raise RuntimeError("Unrecognized event type: %s", self.events_type)
def copy_and_replace(self, key, new_value):
d = self._asdict()
d[key] = new_value
return StreamToken(**d)