Don't require to_delete to have event_ids

This commit is contained in:
Erik Johnston 2018-07-24 11:59:16 +01:00
parent 0a8e4f3af9
commit 223341205e

View file

@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple from collections import OrderedDict, deque, namedtuple
from functools import wraps from functools import wraps
from six import iteritems, itervalues from six import iteritems
from six.moves import range from six.moves import range
from canonicaljson import json from canonicaljson import json
@ -347,8 +347,9 @@ class EventsStore(EventsWorkerStore):
# state in each room after adding these events # state in each room after adding these events
current_state_for_room = {} current_state_for_room = {}
# map room_id->(to_delete, to_insert) where each entry is # map room_id->(to_delete, to_insert) where to_delete is a list
# a map (type,key)->event_id giving the state delta in each # of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room # room
state_delta_for_room = {} state_delta_for_room = {}
@ -647,17 +648,16 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time. Assumes that we are only persisting events for one room at a time.
Returns: Returns:
2-tuple (to_delete, to_insert) where both are state dicts, tuple[list, dict] (to_delete, to_insert): where to_delete are the
i.e. (type, state_key) -> event_id. `to_delete` are the entries to type/state_keys to remove from current_state_events and `to_insert`
first be deleted from current_state_events, `to_insert` are entries are the updates to current_state_events.
to insert.
""" """
existing_state = yield self.get_current_state_ids(room_id) existing_state = yield self.get_current_state_ids(room_id)
to_delete = { to_delete = [
key: ev_id for key, ev_id in iteritems(existing_state) key for key, ev_id in iteritems(existing_state)
if ev_id != current_state.get(key) if key not in current_state
} ]
to_insert = { to_insert = {
key: ev_id for key, ev_id in iteritems(current_state) key: ev_id for key, ev_id in iteritems(current_state)
@ -684,10 +684,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to events from the database. This is useful when retrying due to
IntegrityError. IntegrityError.
state_delta_for_room (dict[str, (list[str], list[str])]): state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of event ids to be removed (to_delete, to_insert), being a list of type/state keys to be
from the current state, and a list of event ids to be added to removed from the current state, and a state set to be added to
the current state. the current state.
new_forward_extremeties (dict[str, list[str]]): new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a The new forward extremities for each room. For each room, a
@ -765,9 +765,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room): for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple to_delete, to_insert = current_state_tuple
# First we add entries to the current_state_delta_stream. We
# do this before updating the current_state_events table so
# that we can use it to calculate the `prev_event_id`. (This
# allows us to not have to pull out the existing state
# unnecessarily).
sql = """
INSERT INTO current_state_delta_stream
(stream_id, room_id, type, state_key, event_id, prev_event_id)
SELECT ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
txn.executemany(sql, (
(
max_stream_order, room_id, etype, state_key, None,
room_id, etype, state_key,
)
for etype, state_key in to_delete
# We sanity check that we're deleting rather than updating
if (etype, state_key) not in to_insert
))
txn.executemany(sql, (
(
max_stream_order, room_id, etype, state_key, ev_id,
room_id, etype, state_key,
)
for (etype, state_key), ev_id in iteritems(to_insert)
))
# Now we actually update the current_state_events table
txn.executemany( txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?", "DELETE FROM current_state_events"
[(ev_id,) for ev_id in itervalues(to_delete)], " WHERE room_id = ? AND type = ? AND state_key = ?",
(
(room_id, etype, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
),
) )
self._simple_insert_many_txn( self._simple_insert_many_txn(
@ -784,25 +821,6 @@ class EventsStore(EventsWorkerStore):
], ],
) )
state_deltas = {key: None for key in to_delete}
state_deltas.update(to_insert)
self._simple_insert_many_txn(
txn,
table="current_state_delta_stream",
values=[
{
"stream_id": max_stream_order,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": ev_id,
"prev_event_id": to_delete.get(key, None),
}
for key, ev_id in iteritems(state_deltas)
]
)
txn.call_after( txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed, self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order, room_id, max_stream_order,
@ -816,7 +834,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all # and which we have added, then we invlidate the caches for all
# those users. # those users.
members_changed = set( members_changed = set(
state_key for ev_type, state_key in state_deltas state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member if ev_type == EventTypes.Member
) )