forked from MirrorHub/synapse
Merge pull request #162 from matrix-org/erikj/backfill_fixes
backfill fixes
This commit is contained in:
commit
5ebd33302f
3 changed files with 153 additions and 94 deletions
|
@ -165,10 +165,11 @@ class FederationClient(FederationBase):
|
|||
for p in transaction_data["pdus"]
|
||||
]
|
||||
|
||||
for i, pdu in enumerate(pdus):
|
||||
pdus[i] = yield self._check_sigs_and_hash(pdu)
|
||||
|
||||
# FIXME: We should handle signature failures more gracefully.
|
||||
pdus[:] = yield defer.gatherResults(
|
||||
[self._check_sigs_and_hash(pdu) for pdu in pdus],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
defer.returnValue(pdus)
|
||||
|
||||
|
|
|
@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
|
|||
if not extremities:
|
||||
extremities = yield self.store.get_oldest_events_in_room(room_id)
|
||||
|
||||
pdus = yield self.replication_layer.backfill(
|
||||
events = yield self.replication_layer.backfill(
|
||||
dest,
|
||||
room_id,
|
||||
limit,
|
||||
limit=limit,
|
||||
extremities=extremities,
|
||||
)
|
||||
|
||||
events = []
|
||||
event_map = {e.event_id: e for e in events}
|
||||
|
||||
for pdu in pdus:
|
||||
event = pdu
|
||||
event_ids = set(e.event_id for e in events)
|
||||
|
||||
# FIXME (erikj): Not sure this actually works :/
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
edges = [
|
||||
ev.event_id
|
||||
for ev in events
|
||||
if set(e_id for e_id, _ in ev.prev_events) - event_ids
|
||||
]
|
||||
|
||||
events.append((event, context))
|
||||
# For each edge get the current state.
|
||||
|
||||
yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
backfilled=True
|
||||
auth_events = {}
|
||||
events_to_state = {}
|
||||
for e_id in edges:
|
||||
state, auth = yield self.replication_layer.get_state_for_room(
|
||||
destination=dest,
|
||||
room_id=room_id,
|
||||
event_id=e_id
|
||||
)
|
||||
auth_events.update({a.event_id: a for a in auth})
|
||||
events_to_state[e_id] = state
|
||||
|
||||
yield defer.gatherResults(
|
||||
[
|
||||
self._handle_new_event(dest, a)
|
||||
for a in auth_events.values()
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
yield defer.gatherResults(
|
||||
[
|
||||
self._handle_new_event(
|
||||
dest, event_map[e_id],
|
||||
state=events_to_state[e_id],
|
||||
backfilled=True,
|
||||
)
|
||||
for e_id in events_to_state
|
||||
],
|
||||
consumeErrors=True
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
events.sort(key=lambda e: e.depth)
|
||||
|
||||
for event in events:
|
||||
if event in events_to_state:
|
||||
continue
|
||||
|
||||
yield self._handle_new_event(
|
||||
dest, event,
|
||||
backfilled=True,
|
||||
)
|
||||
|
||||
defer.returnValue(events)
|
||||
|
@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
|
|||
logger.info(e.message)
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
logger.exception(
|
||||
"Failed to backfill from %s because %s",
|
||||
dom, e,
|
||||
)
|
||||
|
@ -517,54 +555,9 @@ class FederationHandler(BaseHandler):
|
|||
# FIXME
|
||||
pass
|
||||
|
||||
auth_ids_to_deferred = {}
|
||||
|
||||
def process_auth_ev(ev):
|
||||
auth_ids = [e_id for e_id, _ in ev.auth_events]
|
||||
|
||||
prev_ds = [
|
||||
auth_ids_to_deferred[i]
|
||||
for i in auth_ids
|
||||
if i in auth_ids_to_deferred
|
||||
]
|
||||
|
||||
d = defer.Deferred()
|
||||
|
||||
auth_ids_to_deferred[ev.event_id] = d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def f(*_):
|
||||
ev.internal_metadata.outlier = True
|
||||
|
||||
try:
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in auth_chain
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
|
||||
yield self._handle_new_event(
|
||||
origin, ev, auth_events=auth
|
||||
yield self._handle_auth_events(
|
||||
origin, [e for e in auth_chain if e.event_id != event.event_id]
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to handle auth event %s",
|
||||
ev.event_id,
|
||||
)
|
||||
|
||||
d.callback(None)
|
||||
|
||||
if prev_ds:
|
||||
dx = defer.DeferredList(prev_ds)
|
||||
dx.addBoth(f)
|
||||
else:
|
||||
f()
|
||||
|
||||
for e in auth_chain:
|
||||
if e.event_id == event.event_id:
|
||||
return
|
||||
process_auth_ev(e)
|
||||
|
||||
yield defer.DeferredList(auth_ids_to_deferred.values())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_state(e):
|
||||
|
@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler):
|
|||
},
|
||||
"missing": [e.event_id for e in missing_locals],
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_auth_events(self, origin, auth_events):
|
||||
auth_ids_to_deferred = {}
|
||||
|
||||
def process_auth_ev(ev):
|
||||
auth_ids = [e_id for e_id, _ in ev.auth_events]
|
||||
|
||||
prev_ds = [
|
||||
auth_ids_to_deferred[i]
|
||||
for i in auth_ids
|
||||
if i in auth_ids_to_deferred
|
||||
]
|
||||
|
||||
d = defer.Deferred()
|
||||
|
||||
auth_ids_to_deferred[ev.event_id] = d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def f(*_):
|
||||
ev.internal_metadata.outlier = True
|
||||
|
||||
try:
|
||||
auth = {
|
||||
(e.type, e.state_key): e for e in auth_events
|
||||
if e.event_id in auth_ids
|
||||
}
|
||||
|
||||
yield self._handle_new_event(
|
||||
origin, ev, auth_events=auth
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to handle auth event %s",
|
||||
ev.event_id,
|
||||
)
|
||||
|
||||
d.callback(None)
|
||||
|
||||
if prev_ds:
|
||||
dx = defer.DeferredList(prev_ds)
|
||||
dx.addBoth(f)
|
||||
else:
|
||||
f()
|
||||
|
||||
for e in auth_events:
|
||||
process_auth_ev(e)
|
||||
|
||||
yield defer.DeferredList(auth_ids_to_deferred.values())
|
||||
|
|
|
@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
|
|||
from syutil.base64util import encode_base64
|
||||
|
||||
import logging
|
||||
from Queue import PriorityQueue, Empty
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -363,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
|
|||
return self.runInteraction(
|
||||
"get_backfill_events",
|
||||
self._get_backfill_events, room_id, event_list, limit
|
||||
).addCallback(self._get_events)
|
||||
).addCallback(
|
||||
self._get_events
|
||||
).addCallback(
|
||||
lambda l: sorted(l, key=lambda e: -e.depth)
|
||||
)
|
||||
|
||||
def _get_backfill_events(self, txn, room_id, event_list, limit):
|
||||
logger.debug(
|
||||
|
@ -371,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
|
|||
room_id, repr(event_list), limit
|
||||
)
|
||||
|
||||
event_results = event_list
|
||||
event_results = set()
|
||||
|
||||
front = event_list
|
||||
# We want to make sure that we do a breadth-first, "depth" ordered
|
||||
# search.
|
||||
|
||||
query = (
|
||||
"SELECT prev_event_id FROM event_edges "
|
||||
"WHERE room_id = ? AND event_id = ? "
|
||||
"SELECT depth, prev_event_id FROM event_edges"
|
||||
" INNER JOIN events"
|
||||
" ON prev_event_id = events.event_id"
|
||||
" AND event_edges.room_id = events.room_id"
|
||||
" WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
|
||||
" AND event_edges.is_state = ?"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
# We iterate through all event_ids in `front` to select their previous
|
||||
# events. These are dumped in `new_front`.
|
||||
# We continue until we reach the limit *or* new_front is empty (i.e.,
|
||||
# we've run out of things to select
|
||||
while front and len(event_results) < limit:
|
||||
queue = PriorityQueue()
|
||||
|
||||
new_front = []
|
||||
for event_id in front:
|
||||
logger.debug(
|
||||
"_backfill_interaction: id=%s",
|
||||
event_id
|
||||
for event_id in event_list:
|
||||
depth = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={
|
||||
"event_id": event_id,
|
||||
},
|
||||
retcol="depth"
|
||||
)
|
||||
|
||||
queue.put((-depth, event_id))
|
||||
|
||||
while not queue.empty() and len(event_results) < limit:
|
||||
try:
|
||||
_, event_id = queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
if event_id in event_results:
|
||||
continue
|
||||
|
||||
event_results.add(event_id)
|
||||
|
||||
txn.execute(
|
||||
query,
|
||||
(room_id, event_id, limit - len(event_results))
|
||||
(room_id, event_id, False, limit - len(event_results))
|
||||
)
|
||||
|
||||
for row in txn.fetchall():
|
||||
logger.debug(
|
||||
"_backfill_interaction: got id=%s",
|
||||
*row
|
||||
)
|
||||
new_front.append(row[0])
|
||||
|
||||
front = new_front
|
||||
event_results += new_front
|
||||
if row[1] not in event_results:
|
||||
queue.put((-row[0], row[1]))
|
||||
|
||||
return event_results
|
||||
|
||||
|
|
Loading…
Reference in a new issue