forked from MirrorHub/synapse
Fix handling of stream tokens for push. (#8943)
Removes faulty assertions and fixes the logic to ensure the max stream token is always set.
This commit is contained in:
parent
6d02eb22df
commit
b3a4b53587
6 changed files with 18 additions and 50 deletions
1
changelog.d/8943.misc
Normal file
1
changelog.d/8943.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add type hints to push module.
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
from typing import TYPE_CHECKING, Any, Dict
|
||||||
|
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
|
|
||||||
|
@ -36,12 +36,21 @@ class Pusher(metaclass=abc.ABCMeta):
|
||||||
# This is the highest stream ordering we know it's safe to process.
|
# This is the highest stream ordering we know it's safe to process.
|
||||||
# When new events arrive, we'll be given a window of new events: we
|
# When new events arrive, we'll be given a window of new events: we
|
||||||
# should honour this rather than just looking for anything higher
|
# should honour this rather than just looking for anything higher
|
||||||
# because of potential out-of-order event serialisation. This starts
|
# because of potential out-of-order event serialisation.
|
||||||
# off as None though as we don't know any better.
|
self.max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
self.max_stream_ordering = None # type: Optional[int]
|
|
||||||
|
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
|
||||||
|
# We just use the minimum stream ordering and ignore the vector clock
|
||||||
|
# component. This is safe to do as long as we *always* ignore the vector
|
||||||
|
# clock components.
|
||||||
|
max_stream_ordering = max_token.stream
|
||||||
|
|
||||||
|
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||||
|
self._start_processing()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
|
def _start_processing(self):
|
||||||
|
"""Start processing push notifications."""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
|
|
|
@ -22,7 +22,6 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.push import Pusher
|
from synapse.push import Pusher
|
||||||
from synapse.push.mailer import Mailer
|
from synapse.push.mailer import Mailer
|
||||||
from synapse.types import RoomStreamToken
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.app.homeserver import HomeServer
|
from synapse.app.homeserver import HomeServer
|
||||||
|
@ -93,20 +92,6 @@ class EmailPusher(Pusher):
|
||||||
pass
|
pass
|
||||||
self.timed_call = None
|
self.timed_call = None
|
||||||
|
|
||||||
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
|
|
||||||
# We just use the minimum stream ordering and ignore the vector clock
|
|
||||||
# component. This is safe to do as long as we *always* ignore the vector
|
|
||||||
# clock components.
|
|
||||||
max_stream_ordering = max_token.stream
|
|
||||||
|
|
||||||
if self.max_stream_ordering:
|
|
||||||
self.max_stream_ordering = max(
|
|
||||||
max_stream_ordering, self.max_stream_ordering
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.max_stream_ordering = max_stream_ordering
|
|
||||||
self._start_processing()
|
|
||||||
|
|
||||||
def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
|
def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
|
||||||
# We could wake up and cancel the timer but there tend to be quite a
|
# We could wake up and cancel the timer but there tend to be quite a
|
||||||
# lot of read receipts so it's probably less work to just let the
|
# lot of read receipts so it's probably less work to just let the
|
||||||
|
@ -172,7 +157,6 @@ class EmailPusher(Pusher):
|
||||||
being run.
|
being run.
|
||||||
"""
|
"""
|
||||||
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
|
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
|
||||||
assert self.max_stream_ordering is not None
|
|
||||||
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
|
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
|
||||||
self.user_id, start, self.max_stream_ordering
|
self.user_id, start, self.max_stream_ordering
|
||||||
)
|
)
|
||||||
|
|
|
@ -26,7 +26,6 @@ from synapse.events import EventBase
|
||||||
from synapse.logging import opentracing
|
from synapse.logging import opentracing
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.push import Pusher, PusherConfigException
|
from synapse.push import Pusher, PusherConfigException
|
||||||
from synapse.types import RoomStreamToken
|
|
||||||
|
|
||||||
from . import push_rule_evaluator, push_tools
|
from . import push_rule_evaluator, push_tools
|
||||||
|
|
||||||
|
@ -122,17 +121,6 @@ class HttpPusher(Pusher):
|
||||||
if should_check_for_notifs:
|
if should_check_for_notifs:
|
||||||
self._start_processing()
|
self._start_processing()
|
||||||
|
|
||||||
def on_new_notifications(self, max_token: RoomStreamToken) -> None:
|
|
||||||
# We just use the minimum stream ordering and ignore the vector clock
|
|
||||||
# component. This is safe to do as long as we *always* ignore the vector
|
|
||||||
# clock components.
|
|
||||||
max_stream_ordering = max_token.stream
|
|
||||||
|
|
||||||
self.max_stream_ordering = max(
|
|
||||||
max_stream_ordering, self.max_stream_ordering or 0
|
|
||||||
)
|
|
||||||
self._start_processing()
|
|
||||||
|
|
||||||
def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
|
def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
|
||||||
# Note that the min here shouldn't be relied upon to be accurate.
|
# Note that the min here shouldn't be relied upon to be accurate.
|
||||||
|
|
||||||
|
@ -192,10 +180,7 @@ class HttpPusher(Pusher):
|
||||||
Never call this directly: use _process which will only allow this to
|
Never call this directly: use _process which will only allow this to
|
||||||
run once per pusher.
|
run once per pusher.
|
||||||
"""
|
"""
|
||||||
|
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
|
||||||
fn = self.store.get_unread_push_actions_for_user_in_range_for_http
|
|
||||||
assert self.max_stream_ordering is not None
|
|
||||||
unprocessed = await fn(
|
|
||||||
self.user_id, self.last_stream_ordering, self.max_stream_ordering
|
self.user_id, self.last_stream_ordering, self.max_stream_ordering
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -129,9 +129,8 @@ class PusherPool:
|
||||||
)
|
)
|
||||||
|
|
||||||
# create the pusher setting last_stream_ordering to the current maximum
|
# create the pusher setting last_stream_ordering to the current maximum
|
||||||
# stream ordering in event_push_actions, so it will process
|
# stream ordering, so it will process pushes from this point onwards.
|
||||||
# pushes from this point onwards.
|
last_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
last_stream_ordering = await self.store.get_latest_push_action_stream_ordering()
|
|
||||||
|
|
||||||
await self.store.add_pusher(
|
await self.store.add_pusher(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
|
|
|
@ -894,16 +894,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
|
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
|
||||||
return push_actions
|
return push_actions
|
||||||
|
|
||||||
async def get_latest_push_action_stream_ordering(self):
|
|
||||||
def f(txn):
|
|
||||||
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
|
|
||||||
return txn.fetchone()
|
|
||||||
|
|
||||||
result = await self.db_pool.runInteraction(
|
|
||||||
"get_latest_push_action_stream_ordering", f
|
|
||||||
)
|
|
||||||
return result[0] or 0
|
|
||||||
|
|
||||||
def _remove_old_push_actions_before_txn(
|
def _remove_old_push_actions_before_txn(
|
||||||
self, txn, room_id, user_id, stream_ordering
|
self, txn, room_id, user_id, stream_ordering
|
||||||
):
|
):
|
||||||
|
|
Loading…
Reference in a new issue