Refactoring and cleanups

A few non-functional changes:

* A bunch of docstrings to document types
* Split `EventsStore._persist_events_txn` up a bit. Hopefully it's a bit more
  readable.
* Rephrase `EventFederationStore._update_min_depth_for_room_txn` to avoid
  mind-bending conditional.
* Rephrase rejected/outlier conditional in `_update_outliers_txn` to avoid
  mind-bending conditional.
This commit is contained in:
Richard van der Hoff 2017-03-17 11:51:13 +00:00
parent 2abe85d50e
commit 5068fb16a5
5 changed files with 265 additions and 81 deletions

View file

@ -15,6 +15,32 @@
class EventContext(object): class EventContext(object):
"""
Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id
prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id
state_group (int): state group id
rejected (bool|str): A rejection reason if the event was rejected, else
False
push_actions (list[(str, list[object])]): list of (user_id, actions)
tuples
prev_group (int): Previously persisted state group. ``None`` for an
outlier.
delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
(type, state_key) -> event_id. ``None`` for an outlier.
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
"""
__slots__ = [ __slots__ = [
"current_state_ids", "current_state_ids",
"prev_state_ids", "prev_state_ids",

View file

@ -1537,7 +1537,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None): def _prep_event(self, origin, event, state=None, auth_events=None):
"""
Args:
origin:
event:
state:
auth_events:
Returns:
Deferred, which resolves to synapse.events.snapshot.EventContext
"""
context = yield self.state_handler.compute_event_context( context = yield self.state_handler.compute_event_context(
event, old_state=state, event, old_state=state,
) )

View file

@ -177,17 +177,12 @@ class StateHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def compute_event_context(self, event, old_state=None): def compute_event_context(self, event, old_state=None):
""" Fills out the context with the `current state` of the graph. The """Build an EventContext structure for the event.
`current state` here is defined to be the state of the event graph
just before the event - i.e. it never includes `event`
If `event` has `auth_events` then this will also fill out the
`auth_events` field on `context` from the `current_state`.
Args: Args:
event (EventBase) event (synapse.events.EventBase):
Returns: Returns:
an EventContext synapse.events.snapshot.EventContext:
""" """
context = EventContext() context = EventContext()

View file

@ -201,9 +201,9 @@ class EventFederationStore(SQLBaseStore):
def _update_min_depth_for_room_txn(self, txn, room_id, depth): def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id) min_depth = self._get_min_depth_interaction(txn, room_id)
do_insert = depth < min_depth if min_depth else True if min_depth and depth >= min_depth:
return
if do_insert:
self._simple_upsert_txn( self._simple_upsert_txn(
txn, txn,
table="room_depth", table="room_depth",

View file

@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict from collections import deque, namedtuple, OrderedDict
from functools import wraps from functools import wraps
import synapse
import synapse.metrics import synapse.metrics
import logging import logging
import math import math
import ujson as json import ujson as json
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled): def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options. """Add events to the queue, with the given persist_event options.
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
""" """
queue = self._event_persist_queues.setdefault(room_id, deque()) queue = self._event_persist_queues.setdefault(room_id, deque())
if queue: if queue:
@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, backfilled=False): def persist_event(self, event, context, backfilled=False):
"""
Args:
event (EventBase):
context (EventContext):
backfilled (bool):
Returns:
Deferred: resolves to (int, int): the stream ordering of ``event``,
and the stream ordering of the latest persisted event
"""
deferred = self._event_persist_queue.add_to_queue( deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], event.room_id, [(event, context)],
backfilled=backfilled, backfilled=backfilled,
@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _persist_events(self, events_and_contexts, backfilled=False, def _persist_events(self, events_and_contexts, backfilled=False,
delete_existing=False): delete_existing=False):
"""Persist events to db
Args:
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
delete_existing (bool):
Returns:
Deferred: resolves when the events have been persisted
"""
if not events_and_contexts: if not events_and_contexts:
return return
@ -554,11 +582,87 @@ class EventsStore(SQLBaseStore):
and the rejections table. Things reading from those table will need to check and the rejections table. Things reading from those table will need to check
whether the event was rejected. whether the event was rejected.
If delete_existing is True then existing events will be purged from the Args:
database before insertion. This is useful when retrying due to IntegrityError. txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]):
events to persist
backfilled (bool): True if the events were backfilled
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
current_state_for_room (dict[str, (list[str], list[str])]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of event ids to be removed
from the current state, and a list of event ids to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
list of the event ids which are the forward extremities.
""" """
self._update_current_state_txn(txn, current_state_for_room)
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
for room_id, current_state_tuple in current_state_for_room.iteritems(): self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
max_stream_order=max_stream_order,
)
# Ensure that we don't have the same event twice.
events_and_contexts = self._filter_events_and_contexts_for_duplicates(
events_and_contexts,
)
self._update_room_depths_txn(
txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
)
# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
events_and_contexts = self._update_outliers_txn(
txn,
events_and_contexts=events_and_contexts,
)
# From this point onwards the events are only events that we haven't
# seen before.
if delete_existing:
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
self._delete_existing_rows_txn(
txn,
events_and_contexts=events_and_contexts,
)
self._store_event_txn(
txn,
events_and_contexts=events_and_contexts,
)
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
txn,
events_and_contexts=events_and_contexts,
)
# From this point onwards the events are only ones that weren't
# rejected.
self._update_metadata_tables_txn(
txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
)
def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple to_delete, to_insert = current_state_tuple
txn.executemany( txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?", "DELETE FROM current_state_events WHERE event_id = ?",
@ -608,7 +712,9 @@ class EventsStore(SQLBaseStore):
txn, self.get_current_state_ids, (room_id,) txn, self.get_current_state_ids, (room_id,)
) )
for room_id, new_extrem in new_forward_extremeties.items(): def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
for room_id, new_extrem in new_forward_extremities.items():
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="event_forward_extremities", table="event_forward_extremities",
@ -626,7 +732,7 @@ class EventsStore(SQLBaseStore):
"event_id": ev_id, "event_id": ev_id,
"room_id": room_id, "room_id": room_id,
} }
for room_id, new_extrem in new_forward_extremeties.items() for room_id, new_extrem in new_forward_extremities.items()
for ev_id in new_extrem for ev_id in new_extrem
], ],
) )
@ -643,13 +749,22 @@ class EventsStore(SQLBaseStore):
"event_id": event_id, "event_id": event_id,
"stream_ordering": max_stream_order, "stream_ordering": max_stream_order,
} }
for room_id, new_extrem in new_forward_extremeties.items() for room_id, new_extrem in new_forward_extremities.items()
for event_id in new_extrem for event_id in new_extrem
] ]
) )
# Ensure that we don't have the same event twice. @classmethod
# Pick the earliest non-outlier if there is one, else the earliest one. def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
"""Ensure that we don't have the same event twice.
Pick the earliest non-outlier if there is one, else the earliest one.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
Returns:
list[(EventBase, EventContext)]: filtered list
"""
new_events_and_contexts = OrderedDict() new_events_and_contexts = OrderedDict()
for event, context in events_and_contexts: for event, context in events_and_contexts:
prev_event_context = new_events_and_contexts.get(event.event_id) prev_event_context = new_events_and_contexts.get(event.event_id)
@ -662,9 +777,17 @@ class EventsStore(SQLBaseStore):
new_events_and_contexts[event.event_id] = (event, context) new_events_and_contexts[event.event_id] = (event, context)
else: else:
new_events_and_contexts[event.event_id] = (event, context) new_events_and_contexts[event.event_id] = (event, context)
return new_events_and_contexts.values()
events_and_contexts = new_events_and_contexts.values() def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
"""Update min_depth for each room
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
"""
depth_updates = {} depth_updates = {}
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids # Remove the any existing cache entries for the event_ids
@ -683,6 +806,21 @@ class EventsStore(SQLBaseStore):
for room_id, depth in depth_updates.items(): for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth) self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
"""Update any outliers with new event info.
This turns outliers into ex-outliers (unless the new event was
rejected).
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
Returns:
list[(EventBase, EventContext)] new list, without events which
are already in the events table.
"""
txn.execute( txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
",".join(["?"] * len(events_and_contexts)), ",".join(["?"] * len(events_and_contexts)),
@ -697,19 +835,16 @@ class EventsStore(SQLBaseStore):
to_remove = set() to_remove = set()
for event, context in events_and_contexts: for event, context in events_and_contexts:
if context.rejected:
# If the event is rejected then we don't care if the event
# was an outlier or not.
if event.event_id in have_persisted:
# If we have already seen the event then ignore it.
to_remove.add(event)
continue
if event.event_id not in have_persisted: if event.event_id not in have_persisted:
continue continue
to_remove.add(event) to_remove.add(event)
if context.rejected:
# If the event is rejected then we don't care if the event
# was an outlier or not.
continue
outlier_persisted = have_persisted[event.event_id] outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted: if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as # We received a copy of an event that we had already stored as
@ -764,34 +899,16 @@ class EventsStore(SQLBaseStore):
# event isn't an outlier any more. # event isn't an outlier any more.
self._update_backward_extremeties(txn, [event]) self._update_backward_extremeties(txn, [event])
events_and_contexts = [ return [
ec for ec in events_and_contexts if ec[0] not in to_remove ec for ec in events_and_contexts if ec[0] not in to_remove
] ]
@classmethod
def _delete_existing_rows_txn(cls, txn, events_and_contexts):
if not events_and_contexts: if not events_and_contexts:
# Make sure we don't pass an empty list to functions that expect to # nothing to do here
# be storing at least one element.
return return
# From this point onwards the events are only events that we haven't
# seen before.
def event_dict(event):
return {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
if delete_existing:
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
logger.info("Deleting existing") logger.info("Deleting existing")
for table in ( for table in (
@ -823,6 +940,29 @@ class EventsStore(SQLBaseStore):
[(ev.event_id,) for ev, _ in events_and_contexts] [(ev.event_id,) for ev, _ in events_and_contexts]
) )
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
"""
if not events_and_contexts:
# nothing to do here
return
def event_dict(event):
return {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="event_json", table="event_json",
@ -865,6 +1005,19 @@ class EventsStore(SQLBaseStore):
], ],
) )
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
Returns:
list[(EventBase, EventContext)] new list, without the rejected
events.
"""
# Remove the rejected events from the list now that we've added them # Remove the rejected events from the list now that we've added them
# to the events table and the events_json table. # to the events table and the events_json table.
to_remove = set() to_remove = set()
@ -876,16 +1029,23 @@ class EventsStore(SQLBaseStore):
) )
to_remove.add(event) to_remove.add(event)
events_and_contexts = [ return [
ec for ec in events_and_contexts if ec[0] not in to_remove ec for ec in events_and_contexts if ec[0] not in to_remove
] ]
if not events_and_contexts: def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
# Make sure we don't pass an empty list to functions that expect to """Update all the miscellaneous tables for new events
# be storing at least one element.
return
# From this point onwards the events are only ones that weren't rejected. Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
"""
if not events_and_contexts:
# nothing to do here
return
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table. # Insert all the push actions into the event_push_actions table.
@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
# Prefill the event cache # Prefill the event cache
self._add_to_cache(txn, events_and_contexts) self._add_to_cache(txn, events_and_contexts)
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
return
return
def _add_to_cache(self, txn, events_and_contexts): def _add_to_cache(self, txn, events_and_contexts):
to_prefill = [] to_prefill = []