forked from MirrorHub/synapse
Time how long it takes us to do backfill processing (#13535)
This commit is contained in:
parent
2c8cfd6d85
commit
088bcb7ecb
3 changed files with 90 additions and 16 deletions
1
changelog.d/13535.misc
Normal file
1
changelog.d/13535.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`).
|
|
@ -32,6 +32,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import attr
|
||||
from prometheus_client import Histogram
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
@ -79,6 +80,24 @@ if TYPE_CHECKING:
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Added to debug performance and track progress on optimizations
|
||||
backfill_processing_before_timer = Histogram(
|
||||
"synapse_federation_backfill_processing_before_time_seconds",
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
1.0,
|
||||
5.0,
|
||||
10.0,
|
||||
20.0,
|
||||
30.0,
|
||||
40.0,
|
||||
60.0,
|
||||
80.0,
|
||||
"+Inf",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
||||
"""Get joined domains from state
|
||||
|
@ -138,6 +157,7 @@ class FederationHandler:
|
|||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._state_storage_controller = self._storage_controllers.state
|
||||
|
@ -197,12 +217,39 @@ class FederationHandler:
|
|||
return. This is used as part of the heuristic to decide if we
|
||||
should back paginate.
|
||||
"""
|
||||
# Starting the processing time here so we can include the room backfill
|
||||
# linearizer lock queue in the timing
|
||||
processing_start_time = self.clock.time_msec()
|
||||
|
||||
async with self._room_backfill.queue(room_id):
|
||||
return await self._maybe_backfill_inner(room_id, current_depth, limit)
|
||||
return await self._maybe_backfill_inner(
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
processing_start_time=processing_start_time,
|
||||
)
|
||||
|
||||
async def _maybe_backfill_inner(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
self,
|
||||
room_id: str,
|
||||
current_depth: int,
|
||||
limit: int,
|
||||
*,
|
||||
processing_start_time: int,
|
||||
) -> bool:
|
||||
"""
|
||||
Checks whether the `current_depth` is at or approaching any backfill
|
||||
points in the room and if so, will backfill. We only care about
|
||||
checking backfill points that happened before the `current_depth`
|
||||
(meaning less than or equal to the `current_depth`).
|
||||
|
||||
Args:
|
||||
room_id: The room to backfill in.
|
||||
current_depth: The depth to check at for any upcoming backfill points.
|
||||
limit: The max number of events to request from the remote federated server.
|
||||
processing_start_time: The time when `maybe_backfill` started
|
||||
processing. Only used for timing.
|
||||
"""
|
||||
backwards_extremities = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
|
||||
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
|
||||
|
@ -433,6 +480,11 @@ class FederationHandler:
|
|||
|
||||
return False
|
||||
|
||||
processing_end_time = self.clock.time_msec()
|
||||
backfill_processing_before_timer.observe(
|
||||
(processing_start_time - processing_end_time) / 1000
|
||||
)
|
||||
|
||||
success = await try_backfill(likely_domains)
|
||||
if success:
|
||||
return True
|
||||
|
|
|
@ -29,7 +29,7 @@ from typing import (
|
|||
Tuple,
|
||||
)
|
||||
|
||||
from prometheus_client import Counter
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import (
|
||||
|
@ -98,6 +98,26 @@ soft_failed_event_counter = Counter(
|
|||
"Events received over federation that we marked as soft_failed",
|
||||
)
|
||||
|
||||
# Added to debug performance and track progress on optimizations
|
||||
backfill_processing_after_timer = Histogram(
|
||||
"synapse_federation_backfill_processing_after_time_seconds",
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
1.0,
|
||||
5.0,
|
||||
10.0,
|
||||
20.0,
|
||||
30.0,
|
||||
40.0,
|
||||
60.0,
|
||||
80.0,
|
||||
120.0,
|
||||
180.0,
|
||||
"+Inf",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class FederationEventHandler:
|
||||
"""Handles events that originated from federation.
|
||||
|
@ -604,20 +624,21 @@ class FederationEventHandler:
|
|||
if not events:
|
||||
return
|
||||
|
||||
# if there are any events in the wrong room, the remote server is buggy and
|
||||
# should not be trusted.
|
||||
for ev in events:
|
||||
if ev.room_id != room_id:
|
||||
raise InvalidResponseError(
|
||||
f"Remote server {dest} returned event {ev.event_id} which is in "
|
||||
f"room {ev.room_id}, when we were backfilling in {room_id}"
|
||||
)
|
||||
with backfill_processing_after_timer.time():
|
||||
# if there are any events in the wrong room, the remote server is buggy and
|
||||
# should not be trusted.
|
||||
for ev in events:
|
||||
if ev.room_id != room_id:
|
||||
raise InvalidResponseError(
|
||||
f"Remote server {dest} returned event {ev.event_id} which is in "
|
||||
f"room {ev.room_id}, when we were backfilling in {room_id}"
|
||||
)
|
||||
|
||||
await self._process_pulled_events(
|
||||
dest,
|
||||
events,
|
||||
backfilled=True,
|
||||
)
|
||||
await self._process_pulled_events(
|
||||
dest,
|
||||
events,
|
||||
backfilled=True,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def _get_missing_events_for_pdu(
|
||||
|
|
Loading…
Reference in a new issue