From ad0ccf15ea35603c9453523acb3f43661fdbaa12 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Feb 2018 12:29:50 +0000 Subject: [PATCH 1/4] Refactor _set_push_actions_for_event_and_users_txn to use events_and_contexts --- synapse/storage/event_push_actions.py | 65 +++++++++++++++------------ synapse/storage/events.py | 11 ++--- 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f787431b7a..dac3505480 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -88,11 +88,13 @@ class EventPushActionsStore(SQLBaseStore): self._rotate_notifs, 30 * 60 * 1000 ) - def _set_push_actions_for_event_and_users_txn(self, txn, event): - """ + def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts): + """Handles moving push actions from staging table to main + event_push_actions table for all events in `events_and_contexts`. + Args: - event: the event set actions for - tuples: list of tuples of (user_id, actions) + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting """ sql = """ @@ -105,34 +107,39 @@ class EventPushActionsStore(SQLBaseStore): WHERE event_id = ? """ - txn.execute(sql, ( - event.room_id, event.internal_metadata.stream_ordering, - event.depth, event.event_id, - )) + if events_and_contexts: + txn.executemany(sql, ( + ( + event.room_id, event.internal_metadata.stream_ordering, + event.depth, event.event_id, + ) + for event, _ in events_and_contexts + )) - user_ids = self._simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - retcol="user_id", - ) - - self._simple_delete_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - ) - - for uid in user_ids: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid,) + for event, _ in events_and_contexts: + user_ids = self._simple_select_onecol_txn( + txn, + table="event_push_actions_staging", + keyvalues={ + "event_id": event.event_id, + }, + retcol="user_id", ) + for uid in user_ids: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid,) + ) + + txn.executemany( + "DELETE FROM event_push_actions_staging WHERE event_id = ?", + ( + (event.event_id,) + for event, _ in events_and_contexts + ) + ) + @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 73177e0bc2..c8b8abc2e7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1162,16 +1162,17 @@ class EventsStore(SQLBaseStore): backfilled (bool): True if the events were backfilled """ + # Insert all the push actions into the event_push_actions table. + self._set_push_actions_for_event_and_users_txn( + txn, + events_and_contexts=events_and_contexts, + ) + if not events_and_contexts: # nothing to do here return for event, context in events_and_contexts: - # Insert all the push actions into the event_push_actions table. - self._set_push_actions_for_event_and_users_txn( - txn, event, - ) - if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. From 24087bffa932660cf4482b465b759f7161465f8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Feb 2018 12:33:04 +0000 Subject: [PATCH 2/4] Ensure all push actions are deleted from staging --- synapse/storage/event_push_actions.py | 10 +++++++++- synapse/storage/events.py | 11 ++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index dac3505480..6a122b05a8 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -88,13 +88,21 @@ class EventPushActionsStore(SQLBaseStore): self._rotate_notifs, 30 * 60 * 1000 ) - def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts): + def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, + all_events_and_contexts): """Handles moving push actions from staging table to main event_push_actions table for all events in `events_and_contexts`. + Also ensures that all events in `all_events_and_contexts` are removed + from the push action staging area. + Args: events_and_contexts (list[(EventBase, EventContext)]): events we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. """ sql = """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c8b8abc2e7..7f8561a0c4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -693,6 +693,8 @@ class EventsStore(SQLBaseStore): list of the event ids which are the forward extremities. """ + all_events_and_contexts = events_and_contexts + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) @@ -755,6 +757,7 @@ class EventsStore(SQLBaseStore): self._update_metadata_tables_txn( txn, events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, backfilled=backfilled, ) @@ -1152,13 +1155,18 @@ class EventsStore(SQLBaseStore): ec for ec in events_and_contexts if ec[0] not in to_remove ] - def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + def _update_metadata_tables_txn(self, txn, events_and_contexts, + all_events_and_contexts, backfilled): """Update all the miscellaneous tables for new events Args: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. backfilled (bool): True if the events were backfilled """ @@ -1166,6 +1174,7 @@ class EventsStore(SQLBaseStore): self._set_push_actions_for_event_and_users_txn( txn, events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, ) if not events_and_contexts: From 25634ed1523b88f82edaa73124f3afd057fbb16d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Feb 2018 12:40:44 +0000 Subject: [PATCH 3/4] Fix test --- tests/storage/test_event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index d483e7cf9e..dc90e5c243 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -75,7 +75,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): ) yield self.store.runInteraction( "", self.store._set_push_actions_for_event_and_users_txn, - event, + [(event, None)], [(event, None)], ) def _rotate(stream): From c96d547f4dd3c10d3c4be66c40cb07232a16a987 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Feb 2018 11:03:42 +0000 Subject: [PATCH 4/4] Actually use new param --- synapse/storage/event_push_actions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 6a122b05a8..214ace27c9 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -140,11 +140,13 @@ class EventPushActionsStore(SQLBaseStore): (event.room_id, uid,) ) + # Now we delete the staging area for *all* events that were being + # persisted. txn.executemany( "DELETE FROM event_push_actions_staging WHERE event_id = ?", ( (event.event_id,) - for event, _ in events_and_contexts + for event, _ in all_events_and_contexts ) )