forked from MirrorHub/synapse
Merge pull request #2900 from matrix-org/erikj/split_event_push_actions
Split out EventPushActionWorkerStore
This commit is contained in:
commit
2ec49826e8
2 changed files with 77 additions and 91 deletions
|
@ -18,7 +18,7 @@ import logging
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.event_federation import EventFederationStore
|
from synapse.storage.event_federation import EventFederationStore
|
||||||
from synapse.storage.event_push_actions import EventPushActionsStore
|
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
from synapse.storage.roommember import RoomMemberStore
|
||||||
from synapse.storage.state import StateGroupWorkerStore
|
from synapse.storage.state import StateGroupWorkerStore
|
||||||
|
@ -40,7 +40,8 @@ logger = logging.getLogger(__name__)
|
||||||
# the method descriptor on the DataStore and chuck them into our class.
|
# the method descriptor on the DataStore and chuck them into our class.
|
||||||
|
|
||||||
|
|
||||||
class SlavedEventStore(EventsWorkerStore, StateGroupWorkerStore, BaseSlavedStore):
|
class SlavedEventStore(EventPushActionsWorkerStore, EventsWorkerStore,
|
||||||
|
StateGroupWorkerStore, BaseSlavedStore):
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(SlavedEventStore, self).__init__(db_conn, hs)
|
super(SlavedEventStore, self).__init__(db_conn, hs)
|
||||||
|
@ -82,30 +83,12 @@ class SlavedEventStore(EventsWorkerStore, StateGroupWorkerStore, BaseSlavedStore
|
||||||
get_invited_rooms_for_user = RoomMemberStore.__dict__[
|
get_invited_rooms_for_user = RoomMemberStore.__dict__[
|
||||||
"get_invited_rooms_for_user"
|
"get_invited_rooms_for_user"
|
||||||
]
|
]
|
||||||
get_unread_event_push_actions_by_room_for_user = (
|
|
||||||
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
|
|
||||||
)
|
|
||||||
_get_unread_counts_by_receipt_txn = (
|
|
||||||
DataStore._get_unread_counts_by_receipt_txn.__func__
|
|
||||||
)
|
|
||||||
_get_unread_counts_by_pos_txn = (
|
|
||||||
DataStore._get_unread_counts_by_pos_txn.__func__
|
|
||||||
)
|
|
||||||
get_recent_event_ids_for_room = (
|
get_recent_event_ids_for_room = (
|
||||||
StreamStore.__dict__["get_recent_event_ids_for_room"]
|
StreamStore.__dict__["get_recent_event_ids_for_room"]
|
||||||
)
|
)
|
||||||
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
|
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
|
||||||
has_room_changed_since = DataStore.has_room_changed_since.__func__
|
has_room_changed_since = DataStore.has_room_changed_since.__func__
|
||||||
|
|
||||||
get_unread_push_actions_for_user_in_range_for_http = (
|
|
||||||
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
|
|
||||||
)
|
|
||||||
get_unread_push_actions_for_user_in_range_for_email = (
|
|
||||||
DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
|
|
||||||
)
|
|
||||||
get_push_action_users_in_range = (
|
|
||||||
DataStore.get_push_action_users_in_range.__func__
|
|
||||||
)
|
|
||||||
get_rooms_for_user_where_membership_is = (
|
get_rooms_for_user_where_membership_is = (
|
||||||
DataStore.get_rooms_for_user_where_membership_is.__func__
|
DataStore.get_rooms_for_user_where_membership_is.__func__
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2015 OpenMarket Ltd
|
# Copyright 2015 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -62,77 +63,7 @@ def _deserialize_action(actions, is_highlight):
|
||||||
return DEFAULT_NOTIF_ACTION
|
return DEFAULT_NOTIF_ACTION
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsStore(SQLBaseStore):
|
class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
|
||||||
|
|
||||||
def __init__(self, db_conn, hs):
|
|
||||||
super(EventPushActionsStore, self).__init__(db_conn, hs)
|
|
||||||
|
|
||||||
self.register_background_index_update(
|
|
||||||
self.EPA_HIGHLIGHT_INDEX,
|
|
||||||
index_name="event_push_actions_u_highlight",
|
|
||||||
table="event_push_actions",
|
|
||||||
columns=["user_id", "stream_ordering"],
|
|
||||||
)
|
|
||||||
|
|
||||||
self.register_background_index_update(
|
|
||||||
"event_push_actions_highlights_index",
|
|
||||||
index_name="event_push_actions_highlights_index",
|
|
||||||
table="event_push_actions",
|
|
||||||
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
|
|
||||||
where_clause="highlight=1"
|
|
||||||
)
|
|
||||||
|
|
||||||
self._doing_notif_rotation = False
|
|
||||||
self._rotate_notif_loop = self._clock.looping_call(
|
|
||||||
self._rotate_notifs, 30 * 60 * 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
def _set_push_actions_for_event_and_users_txn(self, txn, event):
|
|
||||||
"""
|
|
||||||
Args:
|
|
||||||
event: the event set actions for
|
|
||||||
tuples: list of tuples of (user_id, actions)
|
|
||||||
"""
|
|
||||||
|
|
||||||
sql = """
|
|
||||||
INSERT INTO event_push_actions (
|
|
||||||
room_id, event_id, user_id, actions, stream_ordering,
|
|
||||||
topological_ordering, notif, highlight
|
|
||||||
)
|
|
||||||
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
|
|
||||||
FROM event_push_actions_staging
|
|
||||||
WHERE event_id = ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (
|
|
||||||
event.room_id, event.internal_metadata.stream_ordering,
|
|
||||||
event.depth, event.event_id,
|
|
||||||
))
|
|
||||||
|
|
||||||
user_ids = self._simple_select_onecol_txn(
|
|
||||||
txn,
|
|
||||||
table="event_push_actions_staging",
|
|
||||||
keyvalues={
|
|
||||||
"event_id": event.event_id,
|
|
||||||
},
|
|
||||||
retcol="user_id",
|
|
||||||
)
|
|
||||||
|
|
||||||
self._simple_delete_txn(
|
|
||||||
txn,
|
|
||||||
table="event_push_actions_staging",
|
|
||||||
keyvalues={
|
|
||||||
"event_id": event.event_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
for uid in user_ids:
|
|
||||||
txn.call_after(
|
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
|
||||||
(event.room_id, uid,)
|
|
||||||
)
|
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
|
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
|
||||||
def get_unread_event_push_actions_by_room_for_user(
|
def get_unread_event_push_actions_by_room_for_user(
|
||||||
self, room_id, user_id, last_read_event_id
|
self, room_id, user_id, last_read_event_id
|
||||||
|
@ -449,6 +380,78 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
# Now return the first `limit`
|
# Now return the first `limit`
|
||||||
defer.returnValue(notifs[:limit])
|
defer.returnValue(notifs[:limit])
|
||||||
|
|
||||||
|
|
||||||
|
class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
|
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
||||||
|
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(EventPushActionsStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
self.register_background_index_update(
|
||||||
|
self.EPA_HIGHLIGHT_INDEX,
|
||||||
|
index_name="event_push_actions_u_highlight",
|
||||||
|
table="event_push_actions",
|
||||||
|
columns=["user_id", "stream_ordering"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self.register_background_index_update(
|
||||||
|
"event_push_actions_highlights_index",
|
||||||
|
index_name="event_push_actions_highlights_index",
|
||||||
|
table="event_push_actions",
|
||||||
|
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
|
||||||
|
where_clause="highlight=1"
|
||||||
|
)
|
||||||
|
|
||||||
|
self._doing_notif_rotation = False
|
||||||
|
self._rotate_notif_loop = self._clock.looping_call(
|
||||||
|
self._rotate_notifs, 30 * 60 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
def _set_push_actions_for_event_and_users_txn(self, txn, event):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
event: the event set actions for
|
||||||
|
tuples: list of tuples of (user_id, actions)
|
||||||
|
"""
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO event_push_actions (
|
||||||
|
room_id, event_id, user_id, actions, stream_ordering,
|
||||||
|
topological_ordering, notif, highlight
|
||||||
|
)
|
||||||
|
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
|
||||||
|
FROM event_push_actions_staging
|
||||||
|
WHERE event_id = ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (
|
||||||
|
event.room_id, event.internal_metadata.stream_ordering,
|
||||||
|
event.depth, event.event_id,
|
||||||
|
))
|
||||||
|
|
||||||
|
user_ids = self._simple_select_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="event_push_actions_staging",
|
||||||
|
keyvalues={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
},
|
||||||
|
retcol="user_id",
|
||||||
|
)
|
||||||
|
|
||||||
|
self._simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="event_push_actions_staging",
|
||||||
|
keyvalues={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
for uid in user_ids:
|
||||||
|
txn.call_after(
|
||||||
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
||||||
|
(event.room_id, uid,)
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_push_actions_for_user(self, user_id, before=None, limit=50,
|
def get_push_actions_for_user(self, user_id, before=None, limit=50,
|
||||||
only_highlight=False):
|
only_highlight=False):
|
||||||
|
|
Loading…
Reference in a new issue