forked from MirrorHub/synapse
Merge pull request #979 from matrix-org/erikj/state_ids_api
Add /state_ids federation API
This commit is contained in:
commit
1b5436ad78
4 changed files with 151 additions and 1 deletions
synapse/federation
|
@ -314,6 +314,40 @@ class FederationClient(FederationBase):
|
||||||
Deferred: Results in a list of PDUs.
|
Deferred: Results in a list of PDUs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# First we try and ask for just the IDs, as thats far quicker if
|
||||||
|
# we have most of the state and auth_chain already.
|
||||||
|
# However, this may 404 if the other side has an old synapse.
|
||||||
|
result = yield self.transport_layer.get_room_state_ids(
|
||||||
|
destination, room_id, event_id=event_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
state_event_ids = result["pdu_ids"]
|
||||||
|
auth_event_ids = result.get("auth_chain_ids", [])
|
||||||
|
|
||||||
|
fetched_events, failed_to_fetch = yield self.get_events(
|
||||||
|
[destination], room_id, set(state_event_ids + auth_event_ids)
|
||||||
|
)
|
||||||
|
|
||||||
|
if failed_to_fetch:
|
||||||
|
logger.warn("Failed to get %r", failed_to_fetch)
|
||||||
|
|
||||||
|
event_map = {
|
||||||
|
ev.event_id: ev for ev in fetched_events
|
||||||
|
}
|
||||||
|
|
||||||
|
pdus = [event_map[e_id] for e_id in state_event_ids]
|
||||||
|
auth_chain = [event_map[e_id] for e_id in auth_event_ids]
|
||||||
|
|
||||||
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
|
defer.returnValue((pdus, auth_chain))
|
||||||
|
except HttpResponseException as e:
|
||||||
|
if e.code == 404:
|
||||||
|
logger.info("Failed to use get_room_state_ids API, falling back")
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
result = yield self.transport_layer.get_room_state(
|
result = yield self.transport_layer.get_room_state(
|
||||||
destination, room_id, event_id=event_id,
|
destination, room_id, event_id=event_id,
|
||||||
)
|
)
|
||||||
|
@ -339,6 +373,67 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
defer.returnValue((signed_pdus, signed_auth))
|
defer.returnValue((signed_pdus, signed_auth))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_events(self, destinations, room_id, event_ids, return_local=True):
|
||||||
|
"""Fetch events from some remote destinations, checking if we already
|
||||||
|
have them.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destinations (list)
|
||||||
|
room_id (str)
|
||||||
|
event_ids (list)
|
||||||
|
return_local (bool): Whether to include events we already have in
|
||||||
|
the DB in the returned list of events
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: A deferred resolving to a 2-tuple where the first is a list of
|
||||||
|
events and the second is a list of event ids that we failed to fetch.
|
||||||
|
"""
|
||||||
|
if return_local:
|
||||||
|
seen_events = yield self.store.get_events(event_ids)
|
||||||
|
signed_events = seen_events.values()
|
||||||
|
else:
|
||||||
|
seen_events = yield self.store.have_events(event_ids)
|
||||||
|
signed_events = []
|
||||||
|
|
||||||
|
failed_to_fetch = set()
|
||||||
|
|
||||||
|
missing_events = set(event_ids)
|
||||||
|
for k in seen_events:
|
||||||
|
missing_events.discard(k)
|
||||||
|
|
||||||
|
if not missing_events:
|
||||||
|
defer.returnValue((signed_events, failed_to_fetch))
|
||||||
|
|
||||||
|
def random_server_list():
|
||||||
|
srvs = list(destinations)
|
||||||
|
random.shuffle(srvs)
|
||||||
|
return srvs
|
||||||
|
|
||||||
|
batch_size = 20
|
||||||
|
missing_events = list(missing_events)
|
||||||
|
for i in xrange(0, len(missing_events), batch_size):
|
||||||
|
batch = set(missing_events[i:i + batch_size])
|
||||||
|
|
||||||
|
deferreds = [
|
||||||
|
self.get_pdu(
|
||||||
|
destinations=random_server_list(),
|
||||||
|
event_id=e_id,
|
||||||
|
)
|
||||||
|
for e_id in batch
|
||||||
|
]
|
||||||
|
|
||||||
|
res = yield defer.DeferredList(deferreds, consumeErrors=True)
|
||||||
|
for success, result in res:
|
||||||
|
if success:
|
||||||
|
signed_events.append(result)
|
||||||
|
batch.discard(result.event_id)
|
||||||
|
|
||||||
|
# We removed all events we successfully fetched from `batch`
|
||||||
|
failed_to_fetch.update(batch)
|
||||||
|
|
||||||
|
defer.returnValue((signed_events, failed_to_fetch))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_event_auth(self, destination, room_id, event_id):
|
def get_event_auth(self, destination, room_id, event_id):
|
||||||
|
|
|
@ -214,6 +214,27 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
defer.returnValue((200, resp))
|
defer.returnValue((200, resp))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_state_ids_request(self, origin, room_id, event_id):
|
||||||
|
if not event_id:
|
||||||
|
raise NotImplementedError("Specify an event")
|
||||||
|
|
||||||
|
in_room = yield self.auth.check_host_in_room(room_id, origin)
|
||||||
|
if not in_room:
|
||||||
|
raise AuthError(403, "Host not in room.")
|
||||||
|
|
||||||
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
|
room_id, event_id,
|
||||||
|
)
|
||||||
|
auth_chain = yield self.store.get_auth_chain(
|
||||||
|
[pdu.event_id for pdu in pdus]
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((200, {
|
||||||
|
"pdu_ids": [pdu.event_id for pdu in pdus],
|
||||||
|
"auth_chain_ids": [pdu.event_id for pdu in auth_chain],
|
||||||
|
}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _on_context_state_request_compute(self, room_id, event_id):
|
def _on_context_state_request_compute(self, room_id, event_id):
|
||||||
pdus = yield self.handler.get_state_for_pdu(
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
|
@ -584,7 +605,7 @@ class FederationServer(FederationBase):
|
||||||
origin, pdu.room_id, pdu.event_id,
|
origin, pdu.room_id, pdu.event_id,
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
logger.warn("Failed to get state for event: %s", pdu.event_id)
|
logger.exception("Failed to get state for event: %s", pdu.event_id)
|
||||||
|
|
||||||
yield self.handler.on_receive_pdu(
|
yield self.handler.on_receive_pdu(
|
||||||
origin,
|
origin,
|
||||||
|
|
|
@ -54,6 +54,28 @@ class TransportLayerClient(object):
|
||||||
destination, path=path, args={"event_id": event_id},
|
destination, path=path, args={"event_id": event_id},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@log_function
|
||||||
|
def get_room_state_ids(self, destination, room_id, event_id):
|
||||||
|
""" Requests all state for a given room from the given server at the
|
||||||
|
given event. Returns the state's event_id's
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination (str): The host name of the remote home server we want
|
||||||
|
to get the state from.
|
||||||
|
context (str): The name of the context we want the state of
|
||||||
|
event_id (str): The event we want the context at.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred: Results in a dict received from the remote homeserver.
|
||||||
|
"""
|
||||||
|
logger.debug("get_room_state_ids dest=%s, room=%s",
|
||||||
|
destination, room_id)
|
||||||
|
|
||||||
|
path = PREFIX + "/state_ids/%s/" % room_id
|
||||||
|
return self.client.get_json(
|
||||||
|
destination, path=path, args={"event_id": event_id},
|
||||||
|
)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def get_event(self, destination, event_id, timeout=None):
|
def get_event(self, destination, event_id, timeout=None):
|
||||||
""" Requests the pdu with give id and origin from the given server.
|
""" Requests the pdu with give id and origin from the given server.
|
||||||
|
|
|
@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FederationStateIdsServlet(BaseFederationServlet):
|
||||||
|
PATH = "/state_ids/(?P<room_id>[^/]*)/"
|
||||||
|
|
||||||
|
def on_GET(self, origin, content, query, room_id):
|
||||||
|
return self.handler.on_state_ids_request(
|
||||||
|
origin,
|
||||||
|
room_id,
|
||||||
|
query.get("event_id", [None])[0],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class FederationBackfillServlet(BaseFederationServlet):
|
class FederationBackfillServlet(BaseFederationServlet):
|
||||||
PATH = "/backfill/(?P<context>[^/]*)/"
|
PATH = "/backfill/(?P<context>[^/]*)/"
|
||||||
|
|
||||||
|
@ -536,6 +547,7 @@ SERVLET_CLASSES = (
|
||||||
FederationPullServlet,
|
FederationPullServlet,
|
||||||
FederationEventServlet,
|
FederationEventServlet,
|
||||||
FederationStateServlet,
|
FederationStateServlet,
|
||||||
|
FederationStateIdsServlet,
|
||||||
FederationBackfillServlet,
|
FederationBackfillServlet,
|
||||||
FederationQueryServlet,
|
FederationQueryServlet,
|
||||||
FederationMakeJoinServlet,
|
FederationMakeJoinServlet,
|
||||||
|
|
Loading…
Add table
Reference in a new issue