forked from MirrorHub/synapse
make FederationClient.backfill async
This commit is contained in:
parent
5d17c31596
commit
0536d0c9be
1 changed files with 11 additions and 15 deletions
|
@ -17,7 +17,7 @@
|
||||||
import copy
|
import copy
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Iterable
|
from typing import Dict, Iterable, List, Tuple
|
||||||
|
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ from synapse.api.room_versions import (
|
||||||
EventFormatVersions,
|
EventFormatVersions,
|
||||||
RoomVersions,
|
RoomVersions,
|
||||||
)
|
)
|
||||||
from synapse.events import builder, room_version_to_event_format
|
from synapse.events import EventBase, builder, room_version_to_event_format
|
||||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
|
@ -170,21 +170,17 @@ class FederationClient(FederationBase):
|
||||||
sent_queries_counter.labels("client_one_time_keys").inc()
|
sent_queries_counter.labels("client_one_time_keys").inc()
|
||||||
return self.transport_layer.claim_client_keys(destination, content, timeout)
|
return self.transport_layer.claim_client_keys(destination, content, timeout)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
async def backfill(
|
||||||
@log_function
|
self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
|
||||||
def backfill(self, dest, room_id, limit, extremities):
|
) -> List[EventBase]:
|
||||||
"""Requests some more historic PDUs for the given context from the
|
"""Requests some more historic PDUs for the given room from the
|
||||||
given destination server.
|
given destination server.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
dest (str): The remote homeserver to ask.
|
dest (str): The remote homeserver to ask.
|
||||||
room_id (str): The room_id to backfill.
|
room_id (str): The room_id to backfill.
|
||||||
limit (int): The maximum number of PDUs to return.
|
limit (int): The maximum number of events to return.
|
||||||
extremities (list): List of PDU id and origins of the first pdus
|
extremities (list): our current backwards extremities, to backfill from
|
||||||
we have seen from the context
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred: Results in the received PDUs.
|
|
||||||
"""
|
"""
|
||||||
logger.debug("backfill extrem=%s", extremities)
|
logger.debug("backfill extrem=%s", extremities)
|
||||||
|
|
||||||
|
@ -192,13 +188,13 @@ class FederationClient(FederationBase):
|
||||||
if not extremities:
|
if not extremities:
|
||||||
return
|
return
|
||||||
|
|
||||||
transaction_data = yield self.transport_layer.backfill(
|
transaction_data = await self.transport_layer.backfill(
|
||||||
dest, room_id, extremities, limit
|
dest, room_id, extremities, limit
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("backfill transaction_data=%r", transaction_data)
|
logger.debug("backfill transaction_data=%r", transaction_data)
|
||||||
|
|
||||||
room_version = yield self.store.get_room_version_id(room_id)
|
room_version = await self.store.get_room_version_id(room_id)
|
||||||
format_ver = room_version_to_event_format(room_version)
|
format_ver = room_version_to_event_format(room_version)
|
||||||
|
|
||||||
pdus = [
|
pdus = [
|
||||||
|
@ -207,7 +203,7 @@ class FederationClient(FederationBase):
|
||||||
]
|
]
|
||||||
|
|
||||||
# FIXME: We should handle signature failures more gracefully.
|
# FIXME: We should handle signature failures more gracefully.
|
||||||
pdus[:] = yield make_deferred_yieldable(
|
pdus[:] = await make_deferred_yieldable(
|
||||||
defer.gatherResults(
|
defer.gatherResults(
|
||||||
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
|
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
|
||||||
).addErrback(unwrapFirstError)
|
).addErrback(unwrapFirstError)
|
||||||
|
|
Loading…
Reference in a new issue