mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 02:13:52 +01:00
Initial attempt at sprinkling some @metrics.counted decorations around the federation code
This commit is contained in:
parent
a594087f06
commit
9470412316
2 changed files with 28 additions and 0 deletions
|
@ -25,6 +25,7 @@ from synapse.api.errors import (
|
||||||
from synapse.util.expiringcache import ExpiringCache
|
from synapse.util.expiringcache import ExpiringCache
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||||
|
|
||||||
|
@ -35,6 +36,8 @@ import random
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FederationClient(FederationBase):
|
class FederationClient(FederationBase):
|
||||||
|
|
||||||
|
@ -50,6 +53,7 @@ class FederationClient(FederationBase):
|
||||||
self._get_pdu_cache.start()
|
self._get_pdu_cache.start()
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def send_pdu(self, pdu, destinations):
|
def send_pdu(self, pdu, destinations):
|
||||||
"""Informs the replication layer about a new PDU generated within the
|
"""Informs the replication layer about a new PDU generated within the
|
||||||
home server that should be transmitted to others.
|
home server that should be transmitted to others.
|
||||||
|
@ -77,6 +81,7 @@ class FederationClient(FederationBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def send_edu(self, destination, edu_type, content):
|
def send_edu(self, destination, edu_type, content):
|
||||||
edu = Edu(
|
edu = Edu(
|
||||||
origin=self.server_name,
|
origin=self.server_name,
|
||||||
|
@ -90,11 +95,13 @@ class FederationClient(FederationBase):
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def send_failure(self, failure, destination):
|
def send_failure(self, failure, destination):
|
||||||
self._transaction_queue.enqueue_failure(failure, destination)
|
self._transaction_queue.enqueue_failure(failure, destination)
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def make_query(self, destination, query_type, args,
|
def make_query(self, destination, query_type, args,
|
||||||
retry_on_dns_fail=True):
|
retry_on_dns_fail=True):
|
||||||
"""Sends a federation Query to a remote homeserver of the given type
|
"""Sends a federation Query to a remote homeserver of the given type
|
||||||
|
@ -156,6 +163,7 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def get_pdu(self, destinations, event_id, outlier=False):
|
def get_pdu(self, destinations, event_id, outlier=False):
|
||||||
"""Requests the PDU with given origin and ID from the remote home
|
"""Requests the PDU with given origin and ID from the remote home
|
||||||
servers.
|
servers.
|
||||||
|
@ -245,6 +253,7 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def get_state_for_room(self, destination, room_id, event_id):
|
def get_state_for_room(self, destination, room_id, event_id):
|
||||||
"""Requests all of the `current` state PDUs for a given room from
|
"""Requests all of the `current` state PDUs for a given room from
|
||||||
a remote home server.
|
a remote home server.
|
||||||
|
@ -285,6 +294,7 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def get_event_auth(self, destination, room_id, event_id):
|
def get_event_auth(self, destination, room_id, event_id):
|
||||||
res = yield self.transport_layer.get_event_auth(
|
res = yield self.transport_layer.get_event_auth(
|
||||||
destination, room_id, event_id,
|
destination, room_id, event_id,
|
||||||
|
@ -304,6 +314,7 @@ class FederationClient(FederationBase):
|
||||||
defer.returnValue(signed_auth)
|
defer.returnValue(signed_auth)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def make_join(self, destinations, room_id, user_id):
|
def make_join(self, destinations, room_id, user_id):
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
try:
|
try:
|
||||||
|
@ -330,6 +341,7 @@ class FederationClient(FederationBase):
|
||||||
raise RuntimeError("Failed to send to any server.")
|
raise RuntimeError("Failed to send to any server.")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def send_join(self, destinations, pdu):
|
def send_join(self, destinations, pdu):
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
try:
|
try:
|
||||||
|
@ -379,6 +391,7 @@ class FederationClient(FederationBase):
|
||||||
raise RuntimeError("Failed to send to any server.")
|
raise RuntimeError("Failed to send to any server.")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def send_invite(self, destination, room_id, event_id, pdu):
|
def send_invite(self, destination, room_id, event_id, pdu):
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
code, content = yield self.transport_layer.send_invite(
|
code, content = yield self.transport_layer.send_invite(
|
||||||
|
@ -402,6 +415,7 @@ class FederationClient(FederationBase):
|
||||||
defer.returnValue(pdu)
|
defer.returnValue(pdu)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def query_auth(self, destination, room_id, event_id, local_auth):
|
def query_auth(self, destination, room_id, event_id, local_auth):
|
||||||
"""
|
"""
|
||||||
Params:
|
Params:
|
||||||
|
|
|
@ -22,6 +22,7 @@ from .units import Transaction, Edu
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
|
import synapse.metrics
|
||||||
|
|
||||||
from synapse.api.errors import FederationError, SynapseError
|
from synapse.api.errors import FederationError, SynapseError
|
||||||
|
|
||||||
|
@ -32,6 +33,8 @@ import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FederationServer(FederationBase):
|
class FederationServer(FederationBase):
|
||||||
def set_handler(self, handler):
|
def set_handler(self, handler):
|
||||||
|
@ -72,6 +75,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def on_backfill_request(self, origin, room_id, versions, limit):
|
def on_backfill_request(self, origin, room_id, versions, limit):
|
||||||
pdus = yield self.handler.on_backfill_request(
|
pdus = yield self.handler.on_backfill_request(
|
||||||
origin, room_id, versions, limit
|
origin, room_id, versions, limit
|
||||||
|
@ -81,6 +85,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def on_incoming_transaction(self, transaction_data):
|
def on_incoming_transaction(self, transaction_data):
|
||||||
transaction = Transaction(**transaction_data)
|
transaction = Transaction(**transaction_data)
|
||||||
|
|
||||||
|
@ -160,6 +165,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def on_context_state_request(self, origin, room_id, event_id):
|
def on_context_state_request(self, origin, room_id, event_id):
|
||||||
if event_id:
|
if event_id:
|
||||||
pdus = yield self.handler.get_state_for_pdu(
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
|
@ -187,6 +193,7 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def on_pdu_request(self, origin, event_id):
|
def on_pdu_request(self, origin, event_id):
|
||||||
pdu = yield self._get_persisted_pdu(origin, event_id)
|
pdu = yield self._get_persisted_pdu(origin, event_id)
|
||||||
|
|
||||||
|
@ -199,10 +206,12 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
|
@metrics.counted
|
||||||
def on_pull_request(self, origin, versions):
|
def on_pull_request(self, origin, versions):
|
||||||
raise NotImplementedError("Pull transactions not implemented")
|
raise NotImplementedError("Pull transactions not implemented")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_query_request(self, query_type, args):
|
def on_query_request(self, query_type, args):
|
||||||
if query_type in self.query_handlers:
|
if query_type in self.query_handlers:
|
||||||
response = yield self.query_handlers[query_type](args)
|
response = yield self.query_handlers[query_type](args)
|
||||||
|
@ -213,12 +222,14 @@ class FederationServer(FederationBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_make_join_request(self, room_id, user_id):
|
def on_make_join_request(self, room_id, user_id):
|
||||||
pdu = yield self.handler.on_make_join_request(room_id, user_id)
|
pdu = yield self.handler.on_make_join_request(room_id, user_id)
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_invite_request(self, origin, content):
|
def on_invite_request(self, origin, content):
|
||||||
pdu = self.event_from_pdu_json(content)
|
pdu = self.event_from_pdu_json(content)
|
||||||
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
||||||
|
@ -226,6 +237,7 @@ class FederationServer(FederationBase):
|
||||||
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_send_join_request(self, origin, content):
|
def on_send_join_request(self, origin, content):
|
||||||
logger.debug("on_send_join_request: content: %s", content)
|
logger.debug("on_send_join_request: content: %s", content)
|
||||||
pdu = self.event_from_pdu_json(content)
|
pdu = self.event_from_pdu_json(content)
|
||||||
|
@ -240,6 +252,7 @@ class FederationServer(FederationBase):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_event_auth(self, origin, room_id, event_id):
|
def on_event_auth(self, origin, room_id, event_id):
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
auth_pdus = yield self.handler.on_event_auth(event_id)
|
auth_pdus = yield self.handler.on_event_auth(event_id)
|
||||||
|
@ -248,6 +261,7 @@ class FederationServer(FederationBase):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@metrics.counted
|
||||||
def on_query_auth_request(self, origin, content, event_id):
|
def on_query_auth_request(self, origin, content, event_id):
|
||||||
"""
|
"""
|
||||||
Content is a dict with keys::
|
Content is a dict with keys::
|
||||||
|
|
Loading…
Reference in a new issue