Compare commits

...

42 commits

Author SHA1 Message Date
Erik Johnston 98e085513e Better terms 2018-06-04 15:34:45 +01:00
Erik Johnston 01f0c0e821 Postgres fast update 2018-06-01 17:25:07 +01:00
Erik Johnston 2908104ed6 Speed things up a bit 2018-06-01 17:13:37 +01:00
Erik Johnston 2a9d3b8a19 fixu[p 2018-06-01 15:14:56 +01:00
Erik Johnston f4eb10533e fixup 2018-06-01 15:05:07 +01:00
Erik Johnston 1205362f1d use farey function 2018-06-01 15:01:13 +01:00
Erik Johnston ca11acf388 foo 2018-06-01 11:06:50 +01:00
Erik Johnston a09d5e45b0 fixu 2018-05-31 19:23:04 +01:00
Erik Johnston 397e4c1d3d fixup 2018-05-31 19:20:50 +01:00
Erik Johnston c5da3f697e fixup 2018-05-31 19:17:17 +01:00
Erik Johnston 9d885e578a fixup 2018-05-31 19:16:25 +01:00
Erik Johnston 8384b1f3aa Fixup 2018-05-31 19:08:31 +01:00
Erik Johnston 1e666c7b72 Use fractions 2018-05-31 18:58:30 +01:00
Erik Johnston 676064f2da schema 2018-05-31 10:00:41 +01:00
Erik Johnston f651f850a4 blah 2018-05-30 11:27:39 +01:00
Erik Johnston 1a64c21301 Make to_s_neighbours set 2018-05-25 14:46:50 +01:00
Erik Johnston 66450785d8 Ignore topological for receipts 2018-05-25 13:54:25 +01:00
Erik Johnston 3e927f85df Check is_state 2018-05-25 12:36:51 +01:00
Erik Johnston e717693d77 fix insert 2018-05-25 12:05:47 +01:00
Erik Johnston 34240a8d18 Implement background update for chunks 2018-05-25 11:42:52 +01:00
Erik Johnston 25ae0bf3ab Set a chunk ID for all forward extremities 2018-05-25 11:42:35 +01:00
Erik Johnston f9f6a6e0c1 Support pagination for tokens without chunk part 2018-05-25 10:56:42 +01:00
Erik Johnston 70639b07ec Fix purge history 2018-05-25 10:56:42 +01:00
Erik Johnston 516207a966 Fix non integer limit 2018-05-25 10:56:42 +01:00
Erik Johnston 7e09f57a88 Fix backfill 2018-05-25 10:56:40 +01:00
Erik Johnston 1fc988b43c Fix clamp leave and disable backfill 2018-05-25 10:55:10 +01:00
Erik Johnston f24c3bf0be Implement pagination using chunks 2018-05-25 10:55:10 +01:00
Erik Johnston b0beffa99e Use calculated topological ordering when persisting events 2018-05-25 10:55:10 +01:00
Erik Johnston b65cc7defa Add chunk ID to pagination token 2018-05-25 10:55:10 +01:00
Erik Johnston 13dbcafb9b Compute new chunks for new events
We also calculate a consistent topological ordering within a chunk, but
it isn't used yet.
2018-05-25 10:54:23 +01:00
Erik Johnston bcc9e7f777 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks 2018-05-25 10:53:43 +01:00
Erik Johnston 6e11803ed3 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/room_chunks 2018-05-23 10:54:14 +01:00
Erik Johnston 0a325e5385
Merge pull request #3226 from matrix-org/erikj/chunk_base
Begin adding implementing room chunks
2018-05-18 13:54:34 +01:00
Erik Johnston b725e128f8 Comments 2018-05-18 13:43:01 +01:00
Erik Johnston 0504d809fd More comments 2018-05-17 17:08:36 +01:00
Erik Johnston 12fd6d7688 Document case of unconnected chunks 2018-05-17 16:07:20 +01:00
Erik Johnston a638649254 Make insert_* functions internal and reorder funcs
This makes it clearer what the public interface is vs what subclasses
need to implement.
2018-05-17 15:10:23 +01:00
Erik Johnston d4e4a7344f Increase range of rebalance interval
This both simplifies the code, and ensures that the target node is
roughly in the center of the range rather than at an end.
2018-05-17 15:09:31 +01:00
Erik Johnston c771c124d5 Improve documentation and comments 2018-05-17 15:09:10 +01:00
Erik Johnston 3369354b56 Add note about index in changelog 2018-05-17 14:00:54 +01:00
Erik Johnston 3b505a80dc Merge branch 'develop' of github.com:matrix-org/synapse into erikj/chunk_base 2018-05-17 14:00:41 +01:00
Erik Johnston 943f1029d6 Begin adding implementing room chunks
This commit adds the necessary tables and columns, as well as an
implementation of an online topological sorting algorithm to maintain an
absolute ordering of the room chunks.
2018-05-17 12:05:22 +01:00
14 changed files with 1950 additions and 130 deletions

View file

@ -1,3 +1,11 @@
Changes in <unreleased>
=======================
This release adds an index to the events table. This means that on first
startup there will be an inceased amount of IO until the index is created, and
an increase in disk usage.
Changes in synapse v0.30.0 (2018-05-24)
==========================================
@ -53,7 +61,6 @@ Bug Fixes:
* Fix error in handling receipts (PR #3235)
* Stop the transaction cache caching failures (PR #3255)
Changes in synapse v0.29.1 (2018-05-17)
==========================================
Changes:

View file

@ -714,37 +714,15 @@ class FederationHandler(BaseHandler):
defer.returnValue(events)
@defer.inlineCallbacks
def maybe_backfill(self, room_id, current_depth):
def maybe_backfill(self, room_id, extremities):
"""Checks the database to see if we should backfill before paginating,
and if so do.
Args:
room_id (str)
extremities (list[str]): List of event_ids to backfill from. These
should be event IDs that we don't yet have.
"""
extremities = yield self.store.get_oldest_events_with_depth_in_room(
room_id
)
if not extremities:
logger.debug("Not backfilling as no extremeties found.")
return
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
key=lambda e: -int(e[1])
)
max_depth = sorted_extremeties_tuple[0][1]
# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])
if current_depth > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d",
max_depth, current_depth,
)
return
# Now we need to decide which hosts to hit first.
# First we try hosts that are already in the room
# TODO: HEURISTIC ALERT.
@ -844,7 +822,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
event_ids = list(extremities.iterkeys())
event_ids = list(extremities)
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@ -871,7 +849,7 @@ class FederationHandler(BaseHandler):
} for key, state_dict in states.iteritems()
}
for e_id, _ in sorted_extremeties_tuple:
for e_id in event_ids:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([

View file

@ -211,31 +211,19 @@ class MessageHandler(BaseHandler):
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
member_event_id,
)
source_config.from_key = yield self.store.clamp_token_before(
room_id, source_config.from_key, leave_token,
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
@ -244,6 +232,20 @@ class MessageHandler(BaseHandler):
event_filter=event_filter,
)
if source_config.direction == 'b' and extremities:
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, extremities
)
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)

View file

@ -534,7 +534,7 @@ class RoomEventSource(object):
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
events, next_key = yield self.store.paginate_room_events(
events, next_key, _ = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,

View file

@ -131,6 +131,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id",
)
self._chunk_id_gen = IdGenerator(db_conn, "events", "chunk_id")
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(

View file

@ -0,0 +1,485 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import logging
from collections import deque
from fractions import Fraction
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from synapse.util.katriel_bodlaender import OrderedListStore
import synapse.metrics
metrics = synapse.metrics.get_metrics_for(__name__)
rebalance_counter = metrics.register_counter("rebalances")
logger = logging.getLogger(__name__)
class ChunkDBOrderedListStore(OrderedListStore):
"""Used as the list store for room chunks, efficiently maintaining them in
topological order on updates.
A room chunk is a connected portion of the room events DAG. Chunks are
constructed so that they have the additional property that for all events in
the chunk, either all of their prev_events are in that chunk or none of them
are. This ensures that no event that is subsequently received needs to be
inserted into the middle of a chunk, since it cannot both reference an event
in the chunk and be referenced by an event in the chunk (assuming no
cycles).
As such the set of chunks in a room inherits a DAG, i.e. if an event in one
chunk references an event in a second chunk, then we say that the first
chunk references the second, and thus forming a DAG. (This means that chunks
start off disconnected until an event is received that connects the two
chunks.)
We can therefore end up with multiple chunks in a room when the server
misses some events, e.g. due to the server being offline for a time.
The server may only have a subset of all events in a room, in which case
its possible for the server to have chunks that are unconnected from each
other. The ordering between unconnected chunks is arbitrary.
The class is designed for use inside transactions and so takes a
transaction object in the constructor. This means that it needs to be
re-instantiated in each transaction, so all state needs to be stored
in the database.
Internally the ordering is implemented using floats, and the average is
taken when a node is inserted between other nodes. To avoid precision
errors a minimum difference between sucessive orderings is attempted to be
kept; whenever the difference is too small we attempt to rebalance. See
the `_rebalance` function for implementation details.
Note that OrderedListStore orders nodes such that source of an edge
comes before the target. This is counter intuitive when edges represent
causality, so for the purposes of ordering algorithm we invert the edge
directions, i.e. if chunk A has a prev chunk of B then we say that the
edge is from B to A. This ensures that newer chunks get inserted at the
end (rather than the start).
Note: Calls to `add_node` and `add_edge` cannot overlap for the same room,
and so callers should perform some form of per-room locking when using
this class.
Args:
txn
room_id (str)
clock
rebalance_digits (int): When a rebalance is triggered we rebalance
in a range around the node, where the bounds are rounded to this
number of digits.
min_difference (int): A rebalance is triggered when the difference
between two successive orderings is less than the reciprocal of
this.
"""
def __init__(self,
txn, room_id, clock, database_engine,
rebalance_max_denominator=100,
max_denominator=100000):
self.txn = txn
self.room_id = room_id
self.clock = clock
self.database_engine = database_engine
self.rebalance_md = rebalance_max_denominator
self.max_denominator = max_denominator
def is_before(self, a, b):
"""Implements OrderedListStore"""
return self._get_order(a) < self._get_order(b)
def get_prev(self, node_id):
"""Implements OrderedListStore"""
sql = """
SELECT chunk_id FROM chunk_linearized
WHERE next_chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
row = self.txn.fetchone()
if row:
return row[0]
return None
def get_next(self, node_id):
"""Implements OrderedListStore"""
sql = """
SELECT next_chunk_id FROM chunk_linearized
WHERE chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
row = self.txn.fetchone()
if row:
return row[0]
return None
def _insert_before(self, node_id, target_id):
"""Implements OrderedListStore"""
rebalance = False # Set to true if we need to trigger a rebalance
if target_id:
before_id = self.get_prev(target_id)
if before_id:
new_order = self._insert_between(node_id, before_id, target_id)
else:
new_order = self._insert_at_start(node_id, target_id)
else:
# If target_id is None then we insert at the end.
self.txn.execute("""
SELECT chunk_id
FROM chunk_linearized
WHERE room_id = ? AND next_chunk_id is NULL
""", (self.room_id,))
row = self.txn.fetchone()
if row:
new_order = self._insert_at_end(node_id, row[0])
else:
new_order = self._insert_first(node_id)
rebalance = new_order.denominator > self.max_denominator
if rebalance:
self._rebalance(node_id)
def _insert_after(self, node_id, target_id):
"""Implements OrderedListStore"""
rebalance = False # Set to true if we need to trigger a rebalance
next_chunk_id = None
if target_id:
next_chunk_id = self.get_next(target_id)
if next_chunk_id:
new_order = self._insert_between(node_id, target_id, next_chunk_id)
else:
new_order = self._insert_at_end(node_id, target_id)
else:
# If target_id is None then we insert at the start.
self.txn.execute("""
SELECT chunk_id
FROM chunk_linearized
NATURAL JOIN chunk_linearized_first
WHERE room_id = ?
""", (self.room_id,))
row = self.txn.fetchone()
if row:
new_order = self._insert_at_start(node_id, row[0])
else:
new_order = self._insert_first(node_id)
rebalance = new_order.denominator > self.max_denominator
if rebalance:
self._rebalance(node_id)
def _insert_between(self, node_id, left_id, right_id):
left_order = self._get_order(left_id)
right_order = self._get_order(right_id)
assert left_order < right_order
new_order = stern_brocot_single(left_order, right_order)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": left_id},
updatevalues={"next_chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": right_id,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_at_end(self, node_id, last_id):
last_order = self._get_order(last_id)
new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": last_id},
updatevalues={"next_chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": None,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_at_start(self, node_id, first_id):
first_order = self._get_order(first_id)
new_order = stern_brocot_single(0, first_order)
SQLBaseStore._simple_update_one_txn(
self.txn,
table="chunk_linearized_first",
keyvalues={"room_id": self.room_id},
updatevalues={"chunk_id": node_id},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": first_id,
"numerator": int(new_order.numerator),
"denominator": int(new_order.denominator),
}
)
return new_order
def _insert_first(self, node_id):
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized_first",
values={
"room_id": self.room_id,
"chunk_id": node_id,
},
)
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_linearized",
values={
"chunk_id": node_id,
"room_id": self.room_id,
"next_chunk_id": None,
"numerator": 1,
"denominator": 1,
}
)
return Fraction(1, 1)
def get_nodes_with_edges_to(self, node_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation here
sql = """
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
WHERE g.chunk_id = ?
"""
self.txn.execute(sql, (node_id,))
return [(Fraction(n, d), c) for c, n, d in self.txn]
def get_nodes_with_edges_from(self, node_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation here
sql = """
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
WHERE g.prev_id = ?
"""
self.txn.execute(sql, (node_id,))
return [(Fraction(n, d), c) for c, n, d in self.txn]
def _delete_ordering(self, node_id):
"""Implements OrderedListStore"""
next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
self.txn,
table="chunk_linearized",
keyvalues={
"chunk_id": node_id,
},
retcol="next_chunk_id",
)
SQLBaseStore._simple_delete_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": node_id},
)
sql = """
UPDATE chunk_linearized SET next_chunk_id = ?
WHERE next_chunk_id = ?
"""
self.txn.execute(sql, (next_chunk_id, node_id,))
sql = """
UPDATE chunk_linearized_first SET chunk_id = ?
WHERE chunk_id = ?
"""
self.txn.execute(sql, (next_chunk_id, node_id,))
def _add_edge_to_graph(self, source_id, target_id):
"""Implements OrderedListStore"""
# Note that we use the inverse relation
SQLBaseStore._simple_insert_txn(
self.txn,
table="chunk_graph",
values={"chunk_id": target_id, "prev_id": source_id}
)
def _get_order(self, node_id):
"""Get the ordering of the given node.
"""
row = SQLBaseStore._simple_select_one_txn(
self.txn,
table="chunk_linearized",
keyvalues={"chunk_id": node_id},
retcols=("numerator", "denominator",),
)
return Fraction(row["numerator"], row["denominator"])
def _rebalance(self, node_id):
"""Rebalances the list around the given node to ensure that the
ordering floats don't get too small.
This works by finding a range that includes the given node, and
recalculating the ordering floats such that they're equidistant in
that range.
"""
logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
old_order = self._get_order(node_id)
a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
assert old_order < Fraction(a, b)
assert c + d > self.rebalance_md
with_sql = """
WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
FROM chunk_linearized WHERE chunk_id = ?
UNION ALL
SELECT n.chunk_id, n.next_chunk_id, n,
c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
FROM chunks AS c
INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
)
"""
if isinstance(self.database_engine, PostgresEngine):
sql = with_sql + """
UPDATE chunk_linearized AS l
SET numerator = a, denominator = b
FROM chunks AS c
WHERE c.chunk_id = l.chunk_id
"""
else:
sql = with_sql + """
UPDATE chunk_linearized
SET (numerator, denominator) = (
SELECT a, b FROM chunks
WHERE chunks.chunk_id = chunk_linearized.chunk_id
)
WHERE chunk_id in (SELECT chunk_id FROM chunks)
"""
self.txn.execute(sql, (
self.rebalance_md, a, b, c, d, node_id
))
rebalance_counter.inc()
def stern_brocot_single(min_frac, max_frac):
assert 0 <= min_frac < max_frac
# If the determinant is 1 then the fraction with smallest numerator and
# denominator in the range is the mediant, so we don't have to use the
# stern brocot tree to search for it.
determinant = (
min_frac.denominator * max_frac.numerator
- min_frac.numerator * max_frac.denominator
)
if determinant == 1:
return Fraction(
min_frac.numerator + max_frac.numerator,
min_frac.denominator + max_frac.denominator,
)
a, b, c, d = 0, 1, 1, 0
while True:
f = Fraction(a + c, b + d)
if f <= min_frac:
a, b, c, d = a + c, b + d, c, d
elif min_frac < f < max_frac:
return f
else:
a, b, c, d = a, b, a + c, b + d
def find_farey_terms(min_frac, max_denom):
a, b, c, d = 0, 1, 1, 0
while True:
cur_frac = Fraction(a + c, b + d)
if b + d > max_denom:
break
if cur_frac <= min_frac:
a, b, c, d = a + c, b + d, c, d
elif min_frac < cur_frac:
a, b, c, d = a, b, a + c, b + d
if Fraction(a, b) <= min_frac:
k = int((max_denom + b) / d)
a, b, c, d = c, d, (k*c-a), (k*d-b)
return a, b, c, d

View file

@ -23,6 +23,7 @@ import simplejson as json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import (
@ -201,6 +202,7 @@ def _retry_on_integrity_error(func):
class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@ -232,6 +234,20 @@ class EventsStore(EventsWorkerStore):
psql_only=True,
)
self.register_background_index_update(
"events_chunk_index",
index_name="events_chunk_index",
table="events",
columns=["room_id", "chunk_id", "topological_ordering", "stream_ordering"],
unique=True,
psql_only=True,
)
self.register_background_update_handler(
self.EVENT_FIELDS_CHUNK,
self._background_compute_chunks,
)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@ -1010,13 +1026,20 @@ class EventsStore(EventsWorkerStore):
}
)
sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
chunk_id, topo = self._compute_chunk_id_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)
txn.execute(
sql,
(False, event.event_id,)
self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event.event_id},
updatevalues={
"outlier": False,
"chunk_id": chunk_id,
"topological_ordering": topo,
},
)
# Update the event_backward_extremities table now that this
@ -1099,13 +1122,22 @@ class EventsStore(EventsWorkerStore):
],
)
if event.internal_metadata.is_outlier():
chunk_id, topo = None, 0
else:
chunk_id, topo = self._compute_chunk_id_txn(
txn, event.room_id, event.event_id,
[eid for eid, _ in event.prev_events],
)
self._simple_insert_many_txn(
txn,
table="events",
values=[
{
"stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth,
"chunk_id": chunk_id,
"topological_ordering": topo,
"depth": event.depth,
"event_id": event.event_id,
"room_id": event.room_id,
@ -1335,6 +1367,214 @@ class EventsStore(EventsWorkerStore):
(event.event_id, event.redacts)
)
def _compute_chunk_id_txn(self, txn, room_id, event_id, prev_event_ids):
"""Computes the chunk ID and topological ordering for an event.
Also handles updating chunk_graph table.
Args:
txn,
room_id (str)
event_id (str)
prev_event_ids (list[str])
Returns:
tuple[int, int]: Returns the chunk_id, topological_ordering for
the event
"""
# We calculate the chunk for an event using the following rules:
#
# 1. If all prev events have the same chunk ID then use that chunk ID
# 2. If we have none of the prev events but do have events pointing to
# it, then we use their chunk ID if:
# - Theyre all in the same chunk, and
# - All their prev events match the events being inserted
# 3. Otherwise, create a new chunk and use that
# Set of chunks that the event refers to. Includes None if there were
# prev events that we don't have (or don't have a chunk for)
prev_chunk_ids = set()
for eid in prev_event_ids:
chunk_id = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)
prev_chunk_ids.add(chunk_id)
forward_events = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"prev_event_id": event_id,
"is_state": False,
},
retcol="event_id",
)
# Set of chunks that refer to this event.
forward_chunk_ids = set()
# Set of event_ids of all prev_events of those in `forward_events`. This
# is guaranteed to contain at least the given event_id.
sibling_events = set()
for eid in set(forward_events):
chunk_id = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={"event_id": eid},
retcol="chunk_id",
allow_none=True,
)
if chunk_id is not None:
# chunk_id can be None if it's an outlier
forward_chunk_ids.add(chunk_id)
pes = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"event_id": eid,
"is_state": False,
},
retcol="prev_event_id",
)
sibling_events.update(pes)
table = ChunkDBOrderedListStore(
txn, room_id, self.clock, self.database_engine,
)
# If there is only one previous chunk (and that isn't None), then this
# satisfies condition one.
if len(prev_chunk_ids) == 1 and None not in prev_chunk_ids:
chunk_id = list(prev_chunk_ids)[0]
# This event is being inserted at the end of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="COALESCE(MAX(topological_ordering), 0)",
)
new_topo += 1
# We need to now update the database with any new edges between chunks
current_prev_ids = set()
current_forward_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"prev_id": chunk_id,
},
retcol="chunk_id",
)
# If there is only one forward chunk and only one sibling event (which
# would be the given event), then this satisfies condition two.
elif len(forward_chunk_ids) == 1 and len(sibling_events) == 1:
chunk_id = list(forward_chunk_ids)[0]
# This event is being inserted at the start of the chunk
new_topo = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"room_id": room_id,
"chunk_id": chunk_id,
},
retcol="COALESCE(MIN(topological_ordering), 0)",
)
new_topo -= 1
# We need to now update the database with any new edges between chunks
current_prev_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"chunk_id": chunk_id,
},
retcol="prev_id",
)
current_forward_ids = set()
else:
chunk_id = self._chunk_id_gen.get_next()
new_topo = 0
# We've generated a new chunk, so we have to tell the
# ChunkDBOrderedListStore about that.
table.add_node(chunk_id)
# We need to now update the database with any new edges between chunks
current_prev_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"chunk_id": chunk_id,
},
retcol="prev_id",
)
current_forward_ids = self._simple_select_onecol_txn(
txn,
table="chunk_graph",
keyvalues={
"prev_id": chunk_id,
},
retcol="chunk_id",
)
prev_chunk_ids = set(
pid for pid in prev_chunk_ids
if pid is not None and pid not in current_prev_ids and pid != chunk_id
)
forward_chunk_ids = set(
fid for fid in forward_chunk_ids
if fid not in current_forward_ids and fid != chunk_id
)
if prev_chunk_ids:
for pid in prev_chunk_ids:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
table.add_edge(pid, chunk_id)
if forward_chunk_ids:
for fid in forward_chunk_ids:
# Note that the edge direction is reversed than what you might
# expect. See ChunkDBOrderedListStore for more details.
table.add_edge(chunk_id, fid)
# We now need to update the backwards extremities for the chunks.
txn.executemany("""
INSERT INTO chunk_backwards_extremities (chunk_id, event_id)
SELECT ?, ? WHERE NOT EXISTS (
SELECT event_id FROM events WHERE event_id = ?
AND NOT outlier
)
""", [(chunk_id, eid, eid) for eid in prev_event_ids])
self._simple_delete_txn(
txn,
table="chunk_backwards_extremities",
keyvalues={"event_id": event_id},
)
return chunk_id, new_topo
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
@ -1628,6 +1868,66 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(result)
@defer.inlineCallbacks
def _background_compute_chunks(self, progress, batch_size):
up_to_stream_id = progress.get("up_to_stream_id")
if up_to_stream_id is None:
up_to_stream_id = self.get_current_events_token() + 1
rows_inserted = progress.get("rows_inserted", 0)
def reindex_chunks_txn(txn):
txn.execute("""
SELECT stream_ordering, room_id, event_id,
(
SELECT COALESCE(array_agg(prev_event_id), ARRAY[]::TEXT[])
FROM event_edges AS eg
WHERE NOT is_state AND eg.event_id = e.event_id
) AS prev_events
FROM events AS e
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
ORDER BY stream_ordering DESC
LIMIT ?
""", (up_to_stream_id, False, batch_size))
rows = txn.fetchall()
stream_ordering = up_to_stream_id
for stream_ordering, room_id, event_id, prev_events in rows:
chunk_id, topo = self._compute_chunk_id_txn(
txn, room_id, event_id, prev_events,
)
self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={
"chunk_id": chunk_id,
"topological_ordering": topo,
},
)
progress = {
"up_to_stream_id": stream_ordering,
"rows_inserted": rows_inserted + len(rows)
}
self._background_update_progress_txn(
txn, self.EVENT_FIELDS_CHUNK, progress
)
return len(rows)
result = yield self.runInteraction(
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
)
if not result:
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
defer.returnValue(result)
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
@ -1908,17 +2208,42 @@ class EventsStore(EventsWorkerStore):
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
should_delete_params += (room_id, token.topological)
txn.execute(
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE e.room_id = ? AND topological_ordering < ?" % (
should_delete_expr,
),
should_delete_params,
next_token = RoomStreamToken(
token.chunk,
token.topological - 1,
token.stream,
)
while True:
rows, next_token, _ = self._paginate_room_events_txn(
txn, room_id, next_token, direction='b', limit=1000,
)
next_token = RoomStreamToken.parse(next_token)
if len(rows) == 0:
break
txn.executemany(
"""INSERT INTO events_to_purge
SELECT event_id, %s
FROM events
LEFT JOIN state_events USING (event_id)
WHERE event_id = ?
""" % (
should_delete_expr,
),
(
should_delete_params + (row.event_id,)
for row in rows
),
)
txn.execute("""
DELETE FROM events_to_purge
WHERE event_id IN (
SELECT event_id FROM event_forward_extremities
)
""")
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
)
@ -1937,7 +2262,7 @@ class EventsStore(EventsWorkerStore):
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
" WHERE ep2.event_id IS NULL",
" WHERE ep2.event_id IS NULL AND NOT ed.is_state",
)
new_backwards_extrems = txn.fetchall()
@ -1957,6 +2282,27 @@ class EventsStore(EventsWorkerStore):
]
)
txn.execute(
"""DELETE FROM chunk_backwards_extremities
WHERE event_id IN (
SELECT event_id FROM events WHERE room_id = ?
)
""",
(room_id,)
)
txn.execute(
"""
INSERT INTO chunk_backwards_extremities
SELECT DISTINCT ee.chunk_id, e.event_id
FROM events_to_purge AS e
INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id
INNER JOIN events AS ee ON ee.event_id = ed.event_id
LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id
WHERE ep2.event_id IS NULL AND NOT ed.is_state
""",
)
logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
@ -2109,7 +2455,7 @@ class EventsStore(EventsWorkerStore):
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
txn.execute(
"UPDATE events SET outlier = ?"
"UPDATE events SET outlier = ?, chunk_id = NULL"
" WHERE event_id IN ("
" SELECT event_id FROM events_to_purge "
" WHERE NOT should_delete"

View file

@ -369,7 +369,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
"SELECT stream_ordering FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
@ -377,10 +377,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
txn.execute(sql, (room_id, receipt_type, user_id))
if topological_ordering:
for to, so, _ in txn:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
for so, in txn:
if int(so) >= stream_ordering:
return False
self._simple_delete_txn(

View file

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage._base import SQLBaseStore, LoggingTransaction
from synapse.storage.prepare_database import get_statements
SQL = """
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
-- FIXME: Add index on contains_url
INSERT INTO background_updates (update_name, progress_json) VALUES
('events_chunk_index', '{}');
-- Stores how chunks of graph relate to each other
CREATE TABLE chunk_graph (
chunk_id BIGINT NOT NULL,
prev_id BIGINT NOT NULL
);
CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
-- The extremities in each chunk. Note that these are pointing to events that
-- we don't have, rather than boundary between chunks.
CREATE TABLE chunk_backwards_extremities (
chunk_id BIGINT NOT NULL,
event_id TEXT NOT NULL
);
CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(
chunk_id, event_id
);
CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(
event_id
);
-- Maintains an absolute ordering of chunks. Gets updated when we see new
-- edges between chunks.
CREATE TABLE chunk_linearized (
chunk_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
next_chunk_id BIGINT,
numerator BIGINT NOT NULL,
denominator BIGINT NOT NULL
);
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
next_chunk_id, room_id
);
CREATE TABLE chunk_linearized_first (
chunk_id BIGINT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
INSERT into background_updates (update_name, progress_json)
VALUES ('event_fields_chunk_id', '{}');
"""
def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(SQL.splitlines()):
cur.execute(statement)
txn = LoggingTransaction(
cur, "schema_update", database_engine, [], [],
)
rows = SQLBaseStore._simple_select_list_txn(
txn,
table="event_forward_extremities",
keyvalues={},
retcols=("event_id", "room_id",),
)
next_chunk_id = 1
room_to_next_order = {}
prev_chunks_by_room = {}
for row in rows:
chunk_id = next_chunk_id
next_chunk_id += 1
room_id = row["room_id"]
event_id = row["event_id"]
SQLBaseStore._simple_update_txn(
txn,
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
updatevalues={"chunk_id": chunk_id},
)
ordering = room_to_next_order.get(room_id, 1)
room_to_next_order[room_id] = ordering + 1
prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
SQLBaseStore._simple_insert_txn(
txn,
table="chunk_linearized",
values={
"chunk_id": chunk_id,
"room_id": row["room_id"],
"numerator": ordering,
"denominator": 1,
},
)
if prev_chunks:
SQLBaseStore._simple_update_one_txn(
txn,
table="chunk_linearized",
keyvalues={"chunk_id": prev_chunks[-1]},
updatevalues={"next_chunk_id": chunk_id},
)
else:
SQLBaseStore._simple_insert_txn(
txn,
table="chunk_linearized_first",
values={
"chunk_id": chunk_id,
"room_id": row["room_id"],
},
)
prev_chunks.append(chunk_id)
def run_upgrade(*args, **kwargs):
pass

View file

@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
from synapse.storage.engines import PostgresEngine
import abc
@ -62,24 +63,25 @@ _TOPOLOGICAL_TOKEN = "topological"
# Used as return values for pagination APIs
_EventDictReturn = namedtuple("_EventDictReturn", (
"event_id", "topological_ordering", "stream_ordering",
"event_id", "chunk_id", "topological_ordering", "stream_ordering",
))
def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else ""
if token.topological is None:
if token.chunk is None:
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
return "((%d,%d) <%s (%s,%s))" % (
token.topological, token.stream, inclusive,
return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering",
)
return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
token.chunk,
token.topological, "topological_ordering",
token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering",
@ -88,18 +90,19 @@ def lower_bound(token, engine, inclusive=False):
def upper_bound(token, engine, inclusive=True):
inclusive = "=" if inclusive else ""
if token.topological is None:
if token.chunk is None:
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
return "((%d,%d) >%s (%s,%s))" % (
token.topological, token.stream, inclusive,
return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering",
)
return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
token.chunk,
token.topological, "topological_ordering",
token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering",
@ -275,7 +278,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
@ -325,7 +328,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
txn.execute(sql, (user_id, from_id, to_id,))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows
@ -392,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
end_token = RoomStreamToken.parse(end_token)
rows, token = yield self.runInteraction(
rows, token, _ = yield self.runInteraction(
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
room_id, from_token=end_token, limit=limit,
)
@ -437,15 +440,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`room_id` causes it to return the current room specific topological
token.
"""
token = yield self.get_room_max_stream_ordering()
if room_id is None:
defer.returnValue("s%d" % (token,))
token = yield self.get_room_max_stream_ordering()
defer.returnValue(str(RoomStreamToken(None, None, token)))
else:
topo = yield self.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn,
token = yield self.runInteraction(
"get_room_events_max_id", self._get_topological_token_for_room_txn,
room_id,
)
defer.returnValue("t%d-%d" % (topo, token))
if not token:
raise Exception("Server not in room")
defer.returnValue(str(token))
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
@ -460,7 +465,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
table="events",
keyvalues={"event_id": event_id},
retcol="stream_ordering",
).addCallback(lambda row: "s%d" % (row,))
).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
def get_topological_token_for_event(self, event_id):
"""The stream token for an event
@ -469,16 +474,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Raises:
StoreError if the event wasn't in the database.
Returns:
A deferred "t%d-%d" topological token.
A deferred topological token.
"""
return self._simple_select_one(
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"),
retcols=("stream_ordering", "topological_ordering", "chunk_id"),
desc="get_topological_token_for_event",
).addCallback(lambda row: "t%d-%d" % (
row["topological_ordering"], row["stream_ordering"],)
)
).addCallback(lambda row: str(RoomStreamToken(
row["chunk_id"],
row["topological_ordering"],
row["stream_ordering"],
)))
def _get_topological_token_for_room_txn(self, txn, room_id):
sql = """
SELECT chunk_id, topological_ordering, stream_ordering
FROM events
NATURAL JOIN event_forward_extremities
WHERE room_id = ?
ORDER BY stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
if row:
c, t, s = row
return RoomStreamToken(c, t, s)
return None
def get_max_topological_token(self, room_id, stream_key):
sql = (
@ -515,18 +538,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
null topological_ordering.
"""
for event, row in zip(events, rows):
chunk = row.chunk_id
topo = row.topological_ordering
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
internal.order = (
int(topo) if topo else 0,
int(stream),
)
if topo_order and chunk:
internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
internal.after = str(RoomStreamToken(chunk, topo, stream))
internal.order = (
int(chunk) if chunk else 0,
int(topo) if topo else 0,
int(stream),
)
else:
internal.before = str(RoomStreamToken(None, None, stream - 1))
internal.after = str(RoomStreamToken(None, None, stream))
internal.order = (
0, 0, int(stream),
)
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
@ -586,27 +616,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"event_id": event_id,
"room_id": room_id,
},
retcols=["stream_ordering", "topological_ordering"],
retcols=["stream_ordering", "topological_ordering", "chunk_id"],
)
# Paginating backwards includes the event at the token, but paginating
# forward doesn't.
before_token = RoomStreamToken(
results["topological_ordering"] - 1,
results["stream_ordering"],
results["chunk_id"],
results["topological_ordering"],
results["stream_ordering"] - 1,
)
after_token = RoomStreamToken(
results["chunk_id"],
results["topological_ordering"],
results["stream_ordering"],
)
rows, start_token = self._paginate_room_events_txn(
rows, start_token, _ = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_txn(
rows, end_token, _ = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
)
events_after = [r.event_id for r in rows]
@ -689,13 +721,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
those that match the filter.
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
the results as a list of _EventDictReturn, a token that points to
the end of the result set, and a list of chunks iterated over.
"""
assert int(limit) >= 0
# For backwards compatibility we need to check if the token has a
# topological part but no chunk part. If that's the case we can use the
# stream part to generate an appropriate topological token.
if from_token.chunk is None and from_token.topological is not None:
res = self._simple_select_one_txn(
txn,
table="events",
keyvalues={
"stream_ordering": from_token.stream,
},
retcols=(
"chunk_id",
"topological_ordering",
"stream_ordering",
),
allow_none=True,
)
if res and res["chunk_id"] is not None:
from_token = RoomStreamToken(
res["chunk_id"],
res["topological_ordering"],
res["stream_ordering"],
)
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@ -725,10 +781,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
bounds += " AND " + filter_clause
args.extend(filter_args)
args.append(int(limit))
limit = int(limit)
args.append(limit)
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
@ -740,9 +797,59 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, args)
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
iterated_chunks = []
chunk_id = None
if rows:
chunk_id = rows[-1].chunk_id
iterated_chunks = [r.chunk_id for r in rows]
elif from_token.chunk:
chunk_id = from_token.chunk
iterated_chunks = [chunk_id]
table = ChunkDBOrderedListStore(
txn, room_id, self.clock, self.database_engine,
)
while chunk_id and (limit <= 0 or len(rows) < limit):
if chunk_id not in iterated_chunks:
iterated_chunks.append(chunk_id)
if direction == 'b':
# FIXME: There may be multiple things here
chunk_id = table.get_prev(chunk_id)
else:
chunk_id = table.get_next(chunk_id)
if chunk_id is None:
break
sql = (
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE outlier = ? AND room_id = ? AND chunk_id = %(chunk_id)d"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s LIMIT ?"
) % {
"chunk_id": chunk_id,
"order": order,
}
txn.execute(sql, args)
new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
if not new_rows:
break
rows.extend(new_rows)
if limit > 0:
rows = rows[:limit]
if rows:
chunk = rows[-1].chunk_id
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
if direction == 'b':
@ -752,12 +859,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = RoomStreamToken(topo, toke)
next_token = RoomStreamToken(chunk, topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token
return rows, str(next_token),
return rows, str(next_token), iterated_chunks,
@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
@ -779,16 +886,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "topological_ordering" and "stream_orderign".
the keys "event_id", "topological_ordering" and "stream_ordering".
"""
from_key = RoomStreamToken.parse(from_key)
if to_key:
to_key = RoomStreamToken.parse(to_key)
rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
def _do_paginate_room_events(txn):
rows, token, chunks = self._paginate_room_events_txn(
txn, room_id, from_key, to_key, direction, limit, event_filter,
)
extremities = []
seen = set()
for chunk_id in chunks:
if chunk_id in seen:
continue
seen.add(chunk_id)
event_ids = self._simple_select_onecol_txn(
txn,
table="chunk_backwards_extremities",
keyvalues={"chunk_id": chunk_id},
retcol="event_id"
)
extremities.extend(e for e in event_ids if e not in extremities)
return rows, token, extremities
rows, token, extremities = yield self.runInteraction(
"paginate_room_events", _do_paginate_room_events,
)
events = yield self._get_events(
@ -798,7 +927,49 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
defer.returnValue((events, token, extremities))
def clamp_token_before(self, room_id, token, clamp_to):
token = RoomStreamToken.parse(token)
clamp_to = RoomStreamToken.parse(clamp_to)
def clamp_token_before_txn(txn, token):
if not token.topological:
sql = """
SELECT chunk_id, topological_ordering FROM events
WHERE room_id = ? AND stream_ordering <= ?
ORDER BY stream_ordering DESC
"""
txn.execute(sql, (room_id, token.stream,))
row = txn.fetchone()
if not row:
return str(token)
chunk_id, topo = row
token = RoomStreamToken(chunk_id, topo, token.stream)
if token.chunk == clamp_to.chunk:
if token.topological < clamp_to.topological:
return str(token)
else:
return str(clamp_to)
sql = "SELECT rationale FROM chunk_linearized WHERE chunk_id = ?"
txn.execute(sql, (token.chunk,))
token_order, = txn.fetchone()
txn.execute(sql, (clamp_to.chunk,))
clamp_order, = txn.fetchone()
if token_order < clamp_order:
return str(token)
else:
return str(clamp_to)
return self.runInteraction(
"clamp_token_before", clamp_token_before_txn, token
)
class StreamStore(StreamWorkerStore):

View file

@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
)
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")):
"""Tokens are positions between events. The token "s1" comes after event 1.
s0 s1
@ -334,10 +334,17 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse(cls, string):
try:
if string[0] == 's':
return cls(topological=None, stream=int(string[1:]))
if string[0] == 't':
return cls(chunk=None, topological=None, stream=int(string[1:]))
if string[0] == 't': # For backwards compat with older tokens.
parts = string[1:].split('-', 1)
return cls(topological=int(parts[0]), stream=int(parts[1]))
return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
if string[0] == 'c':
parts = string[1:].split('~', 2)
return cls(
chunk=int(parts[0]),
topological=int(parts[1]),
stream=int(parts[2]),
)
except Exception:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@ -346,12 +353,14 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse_stream_token(cls, string):
try:
if string[0] == 's':
return cls(topological=None, stream=int(string[1:]))
return cls(chunk=None, topological=None, stream=int(string[1:]))
except Exception:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
def __str__(self):
if self.chunk is not None:
return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:

View file

@ -0,0 +1,349 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains an implementation of the Katriel-Bodlaender algorithm,
which is used to do online topological ordering of graphs.
Note that the ordering derived from the graph is such that the source node of
an edge comes before the target node of the edge, i.e. a graph of A -> B -> C
would produce the ordering [A, B, C].
This ordering is therefore opposite to what one might expect when considering
the room DAG, as newer messages would be added to the start rather than the
end.
***The ChunkDBOrderedListStore therefore inverts the direction of edges***
See:
A tight analysis of the KatrielBodlaender algorithm for online topological
ordering
Hsiao-Fei Liua and Kun-Mao Chao
https://www.sciencedirect.com/science/article/pii/S0304397507006573
and:
Online Topological Ordering
Irit Katriel and Hans L. Bodlaender
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.78.7933 )
"""
from abc import ABCMeta, abstractmethod
class OrderedListStore(object):
"""An abstract base class that is used to store a graph and maintain a
topological consistent, total ordering.
Internally this uses the Katriel-Bodlaender algorithm, which requires the
store expose an interface for the total ordering that supports:
- Insertion of the node into the ordering either immediately before or
after another node.
- Deletion of the node from the ordering
- Comparing the relative ordering of two arbitary nodes
- Get the node immediately before or after a given node in the ordering
It also needs to be able to interact with the graph in the following ways:
- Query the number of edges from a node in the graph
- Query the number of edges into a node in the graph
- Add an edge to the graph
Users of subclasses should call `add_node` and `add_edge` whenever editing
the graph. The total ordering exposed will remain constant until the next
call to one of these methods.
Note: Calls to `add_node` and `add_edge` cannot overlap, and so callers
should perform some form of locking.
"""
__metaclass__ = ABCMeta
def add_node(self, node_id):
"""Adds a node to the graph.
Args:
node_id (str)
"""
self._insert_before(node_id, None)
def add_edge(self, source, target):
"""Adds a new edge to the graph and updates the ordering.
See module level docs.
Note that both the source and target nodes must have been inserted into
the store (at an arbitrary position) already.
Args:
source (str): The source node of the new edge
target (str): The target node of the new edge
"""
# The following is the Katriel-Bodlaender algorithm.
to_s = []
from_t = []
to_s_neighbours = []
from_t_neighbours = []
to_s_indegree = 0
from_t_outdegree = 0
s = source
t = target
while s and t and not self.is_before(s, t):
m_s = to_s_indegree
m_t = from_t_outdegree
# These functions return a tuple where the first term is a float
# that can be used to order the the list of neighbours.
# These are valid until the next write
pe_s = self.get_nodes_with_edges_to(s)
fe_t = self.get_nodes_with_edges_from(t)
for n, _ in pe_s:
assert n not in to_s
for n, _ in fe_t:
assert n not in from_t
l_s = len(pe_s)
l_t = len(fe_t)
if m_s + l_s <= m_t + l_t:
to_s.append(s)
to_s_neighbours.extend(pe_s)
to_s_indegree += l_s
if to_s_neighbours:
to_s_neighbours = list(set(to_s_neighbours))
to_s_neighbours.sort()
_, s = to_s_neighbours.pop()
else:
s = None
if m_s + l_s >= m_t + l_t:
from_t.append(t)
from_t_neighbours.extend(fe_t)
from_t_outdegree += l_t
if from_t_neighbours:
from_t_neighbours = list(set(from_t_neighbours))
from_t_neighbours.sort(reverse=True)
_, t = from_t_neighbours.pop()
else:
t = None
if s is None:
s = self.get_prev(target)
if t is None:
t = self.get_next(source)
for node_id in to_s:
self._delete_ordering(node_id)
while to_s:
s1 = to_s.pop()
self._insert_after(s1, s)
s = s1
for node_id in from_t:
self._delete_ordering(node_id)
while from_t:
t1 = from_t.pop()
self._insert_before(t1, t)
t = t1
self._add_edge_to_graph(source, target)
@abstractmethod
def is_before(self, first_node, second_node):
"""Returns whether the first node is before the second node.
Args:
first_node (str)
second_node (str)
Returns:
bool: True if first_node is before second_node
"""
pass
@abstractmethod
def get_prev(self, node_id):
"""Gets the node immediately before the given node in the topological
ordering.
Args:
node_id (str)
Returns:
str|None: A node ID or None if no preceding node exists
"""
pass
@abstractmethod
def get_next(self, node_id):
"""Gets the node immediately after the given node in the topological
ordering.
Args:
node_id (str)
Returns:
str|None: A node ID or None if no proceding node exists
"""
pass
@abstractmethod
def get_nodes_with_edges_to(self, node_id):
"""Get all nodes with edges to the given node
Args:
node_id (str)
Returns:
list[tuple[float, str]]: Returns a list of tuple of an ordering
term and the node ID. The ordering term can be used to sort the
returned list.
The ordering is valid until subsequent calls to `add_edge`
functions
"""
pass
@abstractmethod
def get_nodes_with_edges_from(self, node_id):
"""Get all nodes with edges from the given node
Args:
node_id (str)
Returns:
list[tuple[float, str]]: Returns a list of tuple of an ordering
term and the node ID. The ordering term can be used to sort the
returned list.
The ordering is valid until subsequent calls to `add_edge`
functions
"""
pass
@abstractmethod
def _insert_before(self, node_id, target_id):
"""Inserts node immediately before target node.
If target_id is None then the node is inserted at the end of the list
Args:
node_id (str)
target_id (str|None)
"""
pass
@abstractmethod
def _insert_after(self, node_id, target_id):
"""Inserts node immediately after target node.
If target_id is None then the node is inserted at the start of the list
Args:
node_id (str)
target_id (str|None)
"""
pass
@abstractmethod
def _delete_ordering(self, node_id):
"""Deletes the given node from the ordered list (but not the graph).
Used when we want to reinsert it into a different position
Args:
node_id (str)
"""
pass
@abstractmethod
def _add_edge_to_graph(self, source_id, target_id):
"""Adds an edge to the graph from source to target.
Does not update ordering.
Args:
source_id (str)
target_id (str)
"""
pass
class InMemoryOrderedListStore(OrderedListStore):
"""An in memory OrderedListStore
"""
def __init__(self):
# The ordered list of nodes
self.list = []
# Map from node to set of nodes that it references
self.edges_from = {}
# Map from node to set of nodes that it is referenced by
self.edges_to = {}
def is_before(self, first_node, second_node):
return self.list.index(first_node) < self.list.index(second_node)
def get_prev(self, node_id):
idx = self.list.index(node_id) - 1
if idx >= 0:
return self.list[idx]
else:
return None
def get_next(self, node_id):
idx = self.list.index(node_id) + 1
if idx < len(self.list):
return self.list[idx]
else:
return None
def _insert_before(self, node_id, target_id):
if target_id is not None:
idx = self.list.index(target_id)
self.list.insert(idx, node_id)
else:
self.list.append(node_id)
def _insert_after(self, node_id, target_id):
if target_id is not None:
idx = self.list.index(target_id) + 1
self.list.insert(idx, node_id)
else:
self.list.insert(0, node_id)
def _delete_ordering(self, node_id):
self.list.remove(node_id)
def get_nodes_with_edges_to(self, node_id):
to_nodes = self.edges_to.get(node_id, [])
return [(self.list.index(nid), nid) for nid in to_nodes]
def get_nodes_with_edges_from(self, node_id):
from_nodes = self.edges_from.get(node_id, [])
return [(self.list.index(nid), nid) for nid in from_nodes]
def _add_edge_to_graph(self, source_id, target_id):
self.edges_from.setdefault(source_id, set()).add(target_id)
self.edges_to.setdefault(target_id, set()).add(source_id)

View file

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
import random
import tests.unittest
import tests.utils
from gmpy2 import mpq as Fraction
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
"""Tests to ensure that the ordering and rebalancing functions of
ChunkDBOrderedListStore work as expected.
"""
def __init__(self, *args, **kwargs):
super(ChunkLinearizerStoreTestCase, self).__init__(*args, **kwargs)
@defer.inlineCallbacks
def setUp(self):
hs = yield tests.utils.setup_test_homeserver()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@defer.inlineCallbacks
def test_simple_insert_fetch(self):
room_id = "foo_room1"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._insert_before("C", "A")
table._insert_after("D", "A")
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(["C", "A", "D", "B"], ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_many_insert_fetch(self):
room_id = "foo_room2"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
expected = [n for _, n in nodes]
already_inserted = []
random.shuffle(nodes)
while nodes:
i, node_id = nodes.pop()
if not already_inserted:
table.add_node(node_id)
else:
for j, target_id in already_inserted:
if j > i:
break
if j < i:
table._insert_after(node_id, target_id)
else:
table._insert_before(node_id, target_id)
already_inserted.append((i, node_id))
already_inserted.sort()
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_prepend_and_append(self):
room_id = "foo_room3"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 1000,
)
table.add_node("a")
expected = ["a"]
for i in xrange(1, 1000):
node_id = "node_id_before_%d" % i
table._insert_before(node_id, expected[0])
expected.insert(0, node_id)
for i in xrange(1, 1000):
node_id = "node_id_after_%d" % i
table._insert_after(node_id, expected[-1])
expected.append(node_id)
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_worst_case(self):
room_id = "foo_room3"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("a")
prev_node = "a"
expected_prefix = ["a"]
expected_suffix = []
for i in xrange(1, 100):
node_id = "node_id_%d" % i
if i % 2 == 0:
table._insert_before(node_id, prev_node)
expected_prefix.append(node_id)
else:
table._insert_after(node_id, prev_node)
expected_suffix.append(node_id)
prev_node = node_id
sql = """
SELECT chunk_id, numerator, denominator FROM chunk_linearized
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
ordered = [c for _, c in ordered]
expected = expected_prefix + list(reversed(expected_suffix))
self.assertEqual(expected, ordered)
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_get_edges_to(self):
room_id = "foo_room4"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._add_edge_to_graph("A", "B")
table._insert_before("C", "A")
table._add_edge_to_graph("C", "A")
nodes = table.get_nodes_with_edges_from("A")
self.assertEqual([n for _, n in nodes], ["B"])
nodes = table.get_nodes_with_edges_to("A")
self.assertEqual([n for _, n in nodes], ["C"])
yield self.store.runInteraction("test", test_txn)
@defer.inlineCallbacks
def test_get_next_and_prev(self):
room_id = "foo_room5"
def test_txn(txn):
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
self.store.database_engine,
5, 100,
)
table.add_node("A")
table._insert_after("B", "A")
table._insert_before("C", "A")
self.assertEqual(table.get_next("A"), "B")
self.assertEqual(table.get_prev("A"), "C")
yield self.store.runInteraction("test", test_txn)

View file

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.util.katriel_bodlaender import InMemoryOrderedListStore
from tests import unittest
class KatrielBodlaenderTests(unittest.TestCase):
def test_simple_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
]
for node in nodes:
store.add_node(node)
store.add_edge("node_2", "node_3")
store.add_edge("node_1", "node_2")
store.add_edge("node_3", "node_4")
self.assertEqual(nodes, store.list)
def test_reverse_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
]
for node in nodes:
store.add_node(node)
store.add_edge("node_3", "node_2")
store.add_edge("node_2", "node_1")
store.add_edge("node_4", "node_3")
self.assertEqual(list(reversed(nodes)), store.list)
def test_divergent_graph(self):
store = InMemoryOrderedListStore()
nodes = [
"node_1",
"node_2",
"node_3",
"node_4",
"node_5",
"node_6",
]
for node in reversed(nodes):
store.add_node(node)
store.add_edge("node_2", "node_3")
store.add_edge("node_2", "node_5")
store.add_edge("node_1", "node_2")
store.add_edge("node_3", "node_4")
store.add_edge("node_1", "node_3")
store.add_edge("node_4", "node_5")
store.add_edge("node_5", "node_6")
store.add_edge("node_4", "node_6")
self.assertEqual(nodes, store.list)