0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-09 11:32:01 +01:00

Merge pull request #100 from matrix-org/missing_pdu_compat

Handle if get_missing_pdu returns 400 or not all events.
This commit is contained in:
Erik Johnston 2015-03-05 16:42:15 +00:00
commit 12bcf3d179
2 changed files with 114 additions and 15 deletions

View file

@ -19,14 +19,18 @@ from twisted.internet import defer
from .federation_base import FederationBase from .federation_base import FederationBase
from .units import Edu from .units import Edu
from synapse.api.errors import CodeMessageException, SynapseError from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util.expiringcache import ExpiringCache from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
import itertools
import logging import logging
import random
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -440,11 +444,30 @@ class FederationClient(FederationBase):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_missing_events(self, destination, room_id, earliest_events, def get_missing_events(self, destination, room_id, earliest_events_ids,
latest_events, limit, min_depth): latest_events, limit, min_depth):
"""Tries to fetch events we are missing. This is called when we receive
an event without having received all of its ancestors.
Args:
destination (str)
room_id (str)
earliest_events_ids (list): List of event ids. Effectively the
events we expected to receive, but haven't. `get_missing_events`
should only return events that didn't happen before these.
latest_events (list): List of events we have received that we don't
have all previous events for.
limit (int): Maximum number of events to return.
min_depth (int): Minimum depth of events tor return.
"""
try:
content = yield self.transport_layer.get_missing_events( content = yield self.transport_layer.get_missing_events(
destination, room_id, earliest_events, latest_events, limit, destination=destination,
min_depth, room_id=room_id,
earliest_events=earliest_events_ids,
latest_events=[e.event_id for e in latest_events],
limit=limit,
min_depth=min_depth,
) )
events = [ events = [
@ -456,6 +479,78 @@ class FederationClient(FederationBase):
destination, events, outlier=True destination, events, outlier=True
) )
have_gotten_all_from_destination = True
except HttpResponseException as e:
if not e.code == 400:
raise
# We are probably hitting an old server that doesn't support
# get_missing_events
signed_events = []
have_gotten_all_from_destination = False
if len(signed_events) >= limit:
defer.returnValue(signed_events)
servers = yield self.store.get_joined_hosts_for_room(room_id)
servers = set(servers)
servers.discard(self.server_name)
failed_to_fetch = set()
while len(signed_events) < limit:
# Are we missing any?
seen_events = set(earliest_events_ids)
seen_events.update(e.event_id for e in signed_events)
missing_events = {}
for e in itertools.chain(latest_events, signed_events):
if e.depth > min_depth:
missing_events.update({
e_id: e.depth for e_id, _ in e.prev_events
if e_id not in seen_events
and e_id not in failed_to_fetch
})
if not missing_events:
break
have_seen = yield self.store.have_events(missing_events)
for k in have_seen:
missing_events.pop(k, None)
if not missing_events:
break
# Okay, we haven't gotten everything yet. Lets get them.
ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
if have_gotten_all_from_destination:
servers.discard(destination)
def random_server_list():
srvs = list(servers)
random.shuffle(srvs)
return srvs
deferreds = [
self.get_pdu(
destinations=random_server_list(),
event_id=e_id,
)
for e_id, depth in ordered_missing[:limit - len(signed_events)]
]
res = yield defer.DeferredList(deferreds, consumeErrors=True)
for (result, val), (e_id, _) in zip(res, ordered_missing):
if result:
signed_events.append(val)
else:
failed_to_fetch.add(e_id)
defer.returnValue(signed_events) defer.returnValue(signed_events)
def event_from_pdu_json(self, pdu_json, outlier=False): def event_from_pdu_json(self, pdu_json, outlier=False):

View file

@ -413,12 +413,16 @@ class FederationServer(FederationBase):
missing_events = yield self.get_missing_events( missing_events = yield self.get_missing_events(
origin, origin,
pdu.room_id, pdu.room_id,
earliest_events=list(latest), earliest_events_ids=list(latest),
latest_events=[pdu.event_id], latest_events=[pdu],
limit=10, limit=10,
min_depth=min_depth, min_depth=min_depth,
) )
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
for e in missing_events: for e in missing_events:
yield self._handle_new_pdu( yield self._handle_new_pdu(
origin, origin,