forked from MirrorHub/synapse
Check if we've already backfilled events
This commit is contained in:
parent
790f5848b2
commit
cccf86dd05
2 changed files with 36 additions and 7 deletions
|
@ -294,6 +294,15 @@ class FederationHandler(BaseHandler):
|
||||||
extremities=extremities,
|
extremities=extremities,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
seen_events = yield self.store.have_events_in_timeline(
|
||||||
|
set(e.event_id for e in events)
|
||||||
|
)
|
||||||
|
|
||||||
|
events = [e for e in events if e.event_id not in seen_events]
|
||||||
|
|
||||||
|
if not events:
|
||||||
|
defer.returnValue([])
|
||||||
|
|
||||||
event_map = {e.event_id: e for e in events}
|
event_map = {e.event_id: e for e in events}
|
||||||
|
|
||||||
event_ids = set(e.event_id for e in events)
|
event_ids = set(e.event_id for e in events)
|
||||||
|
@ -353,6 +362,7 @@ class FederationHandler(BaseHandler):
|
||||||
for a in auth_events.values():
|
for a in auth_events.values():
|
||||||
if a.event_id in seen_events:
|
if a.event_id in seen_events:
|
||||||
continue
|
continue
|
||||||
|
a.internal_metadata.outlier = True
|
||||||
ev_infos.append({
|
ev_infos.append({
|
||||||
"event": a,
|
"event": a,
|
||||||
"auth_events": {
|
"auth_events": {
|
||||||
|
@ -373,6 +383,11 @@ class FederationHandler(BaseHandler):
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
yield self._handle_new_events(
|
||||||
|
dest, ev_infos,
|
||||||
|
backfilled=True,
|
||||||
|
)
|
||||||
|
|
||||||
events.sort(key=lambda e: e.depth)
|
events.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
|
@ -383,10 +398,9 @@ class FederationHandler(BaseHandler):
|
||||||
"event": event,
|
"event": event,
|
||||||
})
|
})
|
||||||
|
|
||||||
yield self._handle_new_events(
|
yield self._handle_new_event(
|
||||||
dest, ev_infos,
|
dest, event
|
||||||
backfilled=True,
|
)
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue(events)
|
defer.returnValue(events)
|
||||||
|
|
||||||
|
@ -458,11 +472,12 @@ class FederationHandler(BaseHandler):
|
||||||
# TODO: Should we try multiple of these at a time?
|
# TODO: Should we try multiple of these at a time?
|
||||||
for dom in domains:
|
for dom in domains:
|
||||||
try:
|
try:
|
||||||
events = yield self.backfill(
|
yield self.backfill(
|
||||||
dom, room_id,
|
dom, room_id,
|
||||||
limit=100,
|
limit=100,
|
||||||
extremities=[e for e in extremities.keys()]
|
extremities=[e for e in extremities.keys()]
|
||||||
)
|
)
|
||||||
|
defer.returnValue(True)
|
||||||
except SynapseError as e:
|
except SynapseError as e:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Failed to backfill from %s because %s",
|
"Failed to backfill from %s because %s",
|
||||||
|
@ -488,8 +503,6 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if events:
|
|
||||||
defer.returnValue(True)
|
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
success = yield try_backfill(likely_domains)
|
success = yield try_backfill(likely_domains)
|
||||||
|
|
|
@ -543,6 +543,22 @@ class EventsStore(SQLBaseStore):
|
||||||
(event.event_id, event.redacts)
|
(event.event_id, event.redacts)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def have_events_in_timeline(self, event_ids):
|
||||||
|
"""Given a list of event ids, check if we have already processed and
|
||||||
|
stored them as non outliers.
|
||||||
|
"""
|
||||||
|
rows = yield self._simple_select_many_batch(
|
||||||
|
table="events",
|
||||||
|
retcols=("event_id",),
|
||||||
|
column="event_id",
|
||||||
|
iterable=list(event_ids),
|
||||||
|
keyvalues={"outlier": False},
|
||||||
|
desc="have_events_in_timeline",
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue(set(r["event_id"] for r in rows))
|
||||||
|
|
||||||
def have_events(self, event_ids):
|
def have_events(self, event_ids):
|
||||||
"""Given a list of event ids, check if we have already processed them.
|
"""Given a list of event ids, check if we have already processed them.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue