Split out EventPushActionWorkerStore

This commit is contained in:
Erik Johnston 2018-02-21 11:01:13 +00:00
parent a2b25de68d
commit d15d237b0d
2 changed files with 76 additions and 91 deletions

View file

@ -17,7 +17,7 @@ import logging
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_federation import EventFederationStore
from synapse.storage.event_push_actions import EventPushActionsStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.roommember import RoomMemberStore from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupWorkerStore from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore from synapse.storage.stream import StreamStore
@ -38,7 +38,8 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class. # the method descriptor on the DataStore and chuck them into our class.
class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore): class SlavedEventStore(EventPushActionsWorkerStore, StateGroupWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs) super(SlavedEventStore, self).__init__(db_conn, hs)
@ -80,30 +81,12 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
get_invited_rooms_for_user = RoomMemberStore.__dict__[ get_invited_rooms_for_user = RoomMemberStore.__dict__[
"get_invited_rooms_for_user" "get_invited_rooms_for_user"
] ]
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
get_recent_event_ids_for_room = ( get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"] StreamStore.__dict__["get_recent_event_ids_for_room"]
) )
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__ has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
)
get_unread_push_actions_for_user_in_range_for_email = (
DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
)
get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__
)
get_event = DataStore.get_event.__func__ get_event = DataStore.get_event.__func__
get_events = DataStore.get_events.__func__ get_events = DataStore.get_events.__func__
get_rooms_for_user_where_membership_is = ( get_rooms_for_user_where_membership_is = (

View file

@ -62,77 +62,7 @@ def _deserialize_action(actions, is_highlight):
return DEFAULT_NOTIF_ACTION return DEFAULT_NOTIF_ACTION
class EventPushActionsStore(SQLBaseStore): class EventPushActionsWorkerStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)
self.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1"
)
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)
def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""
txn.execute(sql, (
event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
))
user_ids = self._simple_select_onecol_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
retcol="user_id",
)
self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)
for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
@ -449,6 +379,78 @@ class EventPushActionsStore(SQLBaseStore):
# Now return the first `limit` # Now return the first `limit`
defer.returnValue(notifs[:limit]) defer.returnValue(notifs[:limit])
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)
self.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1"
)
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
)
def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""
txn.execute(sql, (
event.room_id, event.internal_metadata.stream_ordering,
event.depth, event.event_id,
))
user_ids = self._simple_select_onecol_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
retcol="user_id",
)
self._simple_delete_txn(
txn,
table="event_push_actions_staging",
keyvalues={
"event_id": event.event_id,
},
)
for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid,)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50, def get_push_actions_for_user(self, user_id, before=None, limit=50,
only_highlight=False): only_highlight=False):