move EventsStream out to its own file

This commit is contained in:
Richard van der Hoff 2019-03-27 10:06:21 +00:00
parent a5798de067
commit aa1e017864
3 changed files with 42 additions and 23 deletions

View file

@ -25,12 +25,12 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens update_function: The function that returns a list of updates between two tokens
""" """
from . import _base from . import _base, events
STREAMS_MAP = { STREAMS_MAP = {
stream.NAME: stream stream.NAME: stream
for stream in ( for stream in (
_base.EventsStream, events.EventsStream,
_base.BackfillStream, _base.BackfillStream,
_base.PresenceStream, _base.PresenceStream,
_base.TypingStream, _base.TypingStream,

View file

@ -26,13 +26,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000 MAX_EVENTS_BEHIND = 10000
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
BackfillStreamRow = namedtuple("BackfillStreamRow", ( BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str "event_id", # str
"room_id", # str "room_id", # str
@ -227,20 +220,6 @@ class Stream(object):
raise NotImplementedError() raise NotImplementedError()
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows
super(EventsStream, self).__init__(hs)
class BackfillStream(Stream): class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before """We fetched some old events and either we had never seen that event before
or it went from being an outlier to not. or it went from being an outlier to not.

View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from ._base import Stream
EventStreamRow = namedtuple("EventStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
))
class EventsStream(Stream):
"""We received a new event, or an event went from being an outlier to not
"""
NAME = "events"
ROW_TYPE = EventStreamRow
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_events_token
self.update_function = store.get_all_new_forward_event_rows
super(EventsStream, self).__init__(hs)