mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-21 01:13:59 +01:00
Generalise the @cancellable
annotation so it can be used on functions other than just servlet methods. (#13662)
This commit is contained in:
parent
a160406d24
commit
7bc110a19e
10 changed files with 75 additions and 75 deletions
1
changelog.d/13662.misc
Normal file
1
changelog.d/13662.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Generalise the `@cancellable` annotation so it can be used on functions other than just servlet methods.
|
|
@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tupl
|
||||||
|
|
||||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||||
from synapse.api.urls import FEDERATION_V1_PREFIX
|
from synapse.api.urls import FEDERATION_V1_PREFIX
|
||||||
from synapse.http.server import HttpServer, ServletCallback, is_method_cancellable
|
from synapse.http.server import HttpServer, ServletCallback
|
||||||
from synapse.http.servlet import parse_json_object_from_request
|
from synapse.http.servlet import parse_json_object_from_request
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import run_in_background
|
from synapse.logging.context import run_in_background
|
||||||
|
@ -34,6 +34,7 @@ from synapse.logging.opentracing import (
|
||||||
whitelisted_homeserver,
|
whitelisted_homeserver,
|
||||||
)
|
)
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.util.cancellation import is_function_cancellable
|
||||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||||
from synapse.util.stringutils import parse_and_validate_server_name
|
from synapse.util.stringutils import parse_and_validate_server_name
|
||||||
|
|
||||||
|
@ -375,7 +376,7 @@ class BaseFederationServlet:
|
||||||
if code is None:
|
if code is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if is_method_cancellable(code):
|
if is_function_cancellable(code):
|
||||||
# The wrapper added by `self._wrap` will inherit the cancellable flag,
|
# The wrapper added by `self._wrap` will inherit the cancellable flag,
|
||||||
# but the wrapper itself does not support cancellation yet.
|
# but the wrapper itself does not support cancellation yet.
|
||||||
# Once resolved, the cancellation tests in
|
# Once resolved, the cancellation tests in
|
||||||
|
|
|
@ -33,7 +33,6 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Pattern,
|
Pattern,
|
||||||
Tuple,
|
Tuple,
|
||||||
TypeVar,
|
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -64,6 +63,7 @@ from synapse.logging.context import defer_to_thread, preserve_fn, run_in_backgro
|
||||||
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches import intern_dict
|
from synapse.util.caches import intern_dict
|
||||||
|
from synapse.util.cancellation import is_function_cancellable
|
||||||
from synapse.util.iterutils import chunk_seq
|
from synapse.util.iterutils import chunk_seq
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -94,68 +94,6 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
|
||||||
HTTP_STATUS_REQUEST_CANCELLED = 499
|
HTTP_STATUS_REQUEST_CANCELLED = 499
|
||||||
|
|
||||||
|
|
||||||
F = TypeVar("F", bound=Callable[..., Any])
|
|
||||||
|
|
||||||
|
|
||||||
_cancellable_method_names = frozenset(
|
|
||||||
{
|
|
||||||
# `RestServlet`, `BaseFederationServlet` and `BaseFederationServerServlet`
|
|
||||||
# methods
|
|
||||||
"on_GET",
|
|
||||||
"on_PUT",
|
|
||||||
"on_POST",
|
|
||||||
"on_DELETE",
|
|
||||||
# `_AsyncResource`, `DirectServeHtmlResource` and `DirectServeJsonResource`
|
|
||||||
# methods
|
|
||||||
"_async_render_GET",
|
|
||||||
"_async_render_PUT",
|
|
||||||
"_async_render_POST",
|
|
||||||
"_async_render_DELETE",
|
|
||||||
"_async_render_OPTIONS",
|
|
||||||
# `ReplicationEndpoint` methods
|
|
||||||
"_handle_request",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def cancellable(method: F) -> F:
|
|
||||||
"""Marks a servlet method as cancellable.
|
|
||||||
|
|
||||||
Methods with this decorator will be cancelled if the client disconnects before we
|
|
||||||
finish processing the request.
|
|
||||||
|
|
||||||
During cancellation, `Deferred.cancel()` will be invoked on the `Deferred` wrapping
|
|
||||||
the method. The `cancel()` call will propagate down to the `Deferred` that is
|
|
||||||
currently being waited on. That `Deferred` will raise a `CancelledError`, which will
|
|
||||||
propagate up, as per normal exception handling.
|
|
||||||
|
|
||||||
Before applying this decorator to a new endpoint, you MUST recursively check
|
|
||||||
that all `await`s in the function are on `async` functions or `Deferred`s that
|
|
||||||
handle cancellation cleanly, otherwise a variety of bugs may occur, ranging from
|
|
||||||
premature logging context closure, to stuck requests, to database corruption.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
class SomeServlet(RestServlet):
|
|
||||||
@cancellable
|
|
||||||
async def on_GET(self, request: SynapseRequest) -> ...:
|
|
||||||
...
|
|
||||||
"""
|
|
||||||
if method.__name__ not in _cancellable_method_names and not any(
|
|
||||||
method.__name__.startswith(prefix) for prefix in _cancellable_method_names
|
|
||||||
):
|
|
||||||
raise ValueError(
|
|
||||||
"@cancellable decorator can only be applied to servlet methods."
|
|
||||||
)
|
|
||||||
|
|
||||||
method.cancellable = True # type: ignore[attr-defined]
|
|
||||||
return method
|
|
||||||
|
|
||||||
|
|
||||||
def is_method_cancellable(method: Callable[..., Any]) -> bool:
|
|
||||||
"""Checks whether a servlet method has the `@cancellable` flag."""
|
|
||||||
return getattr(method, "cancellable", False)
|
|
||||||
|
|
||||||
|
|
||||||
def return_json_error(
|
def return_json_error(
|
||||||
f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
|
f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -389,7 +327,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||||
|
|
||||||
method_handler = getattr(self, "_async_render_%s" % (request_method,), None)
|
method_handler = getattr(self, "_async_render_%s" % (request_method,), None)
|
||||||
if method_handler:
|
if method_handler:
|
||||||
request.is_render_cancellable = is_method_cancellable(method_handler)
|
request.is_render_cancellable = is_function_cancellable(method_handler)
|
||||||
|
|
||||||
raw_callback_return = method_handler(request)
|
raw_callback_return = method_handler(request)
|
||||||
|
|
||||||
|
@ -551,7 +489,7 @@ class JsonResource(DirectServeJsonResource):
|
||||||
async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
|
async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
|
||||||
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
|
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
|
||||||
|
|
||||||
request.is_render_cancellable = is_method_cancellable(callback)
|
request.is_render_cancellable = is_function_cancellable(callback)
|
||||||
|
|
||||||
# Make sure we have an appropriate name for this handler in prometheus
|
# Make sure we have an appropriate name for this handler in prometheus
|
||||||
# (rather than the default of JsonResource).
|
# (rather than the default of JsonResource).
|
||||||
|
|
|
@ -26,12 +26,13 @@ from twisted.web.server import Request
|
||||||
|
|
||||||
from synapse.api.errors import HttpResponseException, SynapseError
|
from synapse.api.errors import HttpResponseException, SynapseError
|
||||||
from synapse.http import RequestTimedOutError
|
from synapse.http import RequestTimedOutError
|
||||||
from synapse.http.server import HttpServer, is_method_cancellable
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging import opentracing
|
from synapse.logging import opentracing
|
||||||
from synapse.logging.opentracing import trace_with_opname
|
from synapse.logging.opentracing import trace_with_opname
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
|
from synapse.util.cancellation import is_function_cancellable
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -311,7 +312,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
url_args = list(self.PATH_ARGS)
|
url_args = list(self.PATH_ARGS)
|
||||||
method = self.METHOD
|
method = self.METHOD
|
||||||
|
|
||||||
if self.CACHE and is_method_cancellable(self._handle_request):
|
if self.CACHE and is_function_cancellable(self._handle_request):
|
||||||
raise Exception(
|
raise Exception(
|
||||||
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
|
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
|
||||||
"is set. The cancellable flag would have no effect."
|
"is set. The cancellable flag would have no effect."
|
||||||
|
@ -359,6 +360,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||||
# The `@cancellable` decorator may be applied to `_handle_request`. But we
|
# The `@cancellable` decorator may be applied to `_handle_request`. But we
|
||||||
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
|
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
|
||||||
# so we have to set up the cancellable flag ourselves.
|
# so we have to set up the cancellable flag ourselves.
|
||||||
request.is_render_cancellable = is_method_cancellable(self._handle_request)
|
request.is_render_cancellable = is_function_cancellable(self._handle_request)
|
||||||
|
|
||||||
return await self._handle_request(request, **kwargs)
|
return await self._handle_request(request, **kwargs)
|
||||||
|
|
|
@ -37,7 +37,7 @@ from synapse.api.errors import (
|
||||||
)
|
)
|
||||||
from synapse.api.filtering import Filter
|
from synapse.api.filtering import Filter
|
||||||
from synapse.events.utils import format_event_for_client_v2
|
from synapse.events.utils import format_event_for_client_v2
|
||||||
from synapse.http.server import HttpServer, cancellable
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
ResolveRoomIdMixin,
|
ResolveRoomIdMixin,
|
||||||
RestServlet,
|
RestServlet,
|
||||||
|
@ -57,6 +57,7 @@ from synapse.storage.state import StateFilter
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID
|
from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
|
from synapse.util.cancellation import cancellable
|
||||||
from synapse.util.stringutils import parse_and_validate_server_name, random_string
|
from synapse.util.stringutils import parse_and_validate_server_name, random_string
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
56
synapse/util/cancellation.py
Normal file
56
synapse/util/cancellation.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
from typing import Any, Callable, TypeVar
|
||||||
|
|
||||||
|
F = TypeVar("F", bound=Callable[..., Any])
|
||||||
|
|
||||||
|
|
||||||
|
def cancellable(function: F) -> F:
|
||||||
|
"""Marks a function as cancellable.
|
||||||
|
|
||||||
|
Servlet methods with this decorator will be cancelled if the client disconnects before we
|
||||||
|
finish processing the request.
|
||||||
|
|
||||||
|
Although this annotation is particularly useful for servlet methods, it's also
|
||||||
|
useful for intermediate functions, where it documents the fact that the function has
|
||||||
|
been audited for cancellation safety and needs to preserve that.
|
||||||
|
This then simplifies auditing new functions that call those same intermediate
|
||||||
|
functions.
|
||||||
|
|
||||||
|
During cancellation, `Deferred.cancel()` will be invoked on the `Deferred` wrapping
|
||||||
|
the method. The `cancel()` call will propagate down to the `Deferred` that is
|
||||||
|
currently being waited on. That `Deferred` will raise a `CancelledError`, which will
|
||||||
|
propagate up, as per normal exception handling.
|
||||||
|
|
||||||
|
Before applying this decorator to a new function, you MUST recursively check
|
||||||
|
that all `await`s in the function are on `async` functions or `Deferred`s that
|
||||||
|
handle cancellation cleanly, otherwise a variety of bugs may occur, ranging from
|
||||||
|
premature logging context closure, to stuck requests, to database corruption.
|
||||||
|
|
||||||
|
See the documentation page on Cancellation for more information.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
class SomeServlet(RestServlet):
|
||||||
|
@cancellable
|
||||||
|
async def on_GET(self, request: SynapseRequest) -> ...:
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
|
||||||
|
function.cancellable = True # type: ignore[attr-defined]
|
||||||
|
return function
|
||||||
|
|
||||||
|
|
||||||
|
def is_function_cancellable(function: Callable[..., Any]) -> bool:
|
||||||
|
"""Checks whether a servlet method has the `@cancellable` flag."""
|
||||||
|
return getattr(function, "cancellable", False)
|
|
@ -18,9 +18,10 @@ from typing import Dict, List, Tuple
|
||||||
from synapse.api.errors import Codes
|
from synapse.api.errors import Codes
|
||||||
from synapse.federation.transport.server import BaseFederationServlet
|
from synapse.federation.transport.server import BaseFederationServlet
|
||||||
from synapse.federation.transport.server._base import Authenticator, _parse_auth_header
|
from synapse.federation.transport.server._base import Authenticator, _parse_auth_header
|
||||||
from synapse.http.server import JsonResource, cancellable
|
from synapse.http.server import JsonResource
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.util.cancellation import cancellable
|
||||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
|
|
@ -18,7 +18,6 @@ from typing import Tuple
|
||||||
from unittest.mock import Mock
|
from unittest.mock import Mock
|
||||||
|
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.http.server import cancellable
|
|
||||||
from synapse.http.servlet import (
|
from synapse.http.servlet import (
|
||||||
RestServlet,
|
RestServlet,
|
||||||
parse_json_object_from_request,
|
parse_json_object_from_request,
|
||||||
|
@ -28,6 +27,7 @@ from synapse.http.site import SynapseRequest
|
||||||
from synapse.rest.client._base import client_patterns
|
from synapse.rest.client._base import client_patterns
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.util.cancellation import cancellable
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.http.server._base import test_disconnect
|
from tests.http.server._base import test_disconnect
|
||||||
|
|
|
@ -18,11 +18,12 @@ from typing import Tuple
|
||||||
from twisted.web.server import Request
|
from twisted.web.server import Request
|
||||||
|
|
||||||
from synapse.api.errors import Codes
|
from synapse.api.errors import Codes
|
||||||
from synapse.http.server import JsonResource, cancellable
|
from synapse.http.server import JsonResource
|
||||||
from synapse.replication.http import REPLICATION_PREFIX
|
from synapse.replication.http import REPLICATION_PREFIX
|
||||||
from synapse.replication.http._base import ReplicationEndpoint
|
from synapse.replication.http._base import ReplicationEndpoint
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
|
from synapse.util.cancellation import cancellable
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.http.server._base import test_disconnect
|
from tests.http.server._base import test_disconnect
|
||||||
|
|
|
@ -26,12 +26,12 @@ from synapse.http.server import (
|
||||||
DirectServeJsonResource,
|
DirectServeJsonResource,
|
||||||
JsonResource,
|
JsonResource,
|
||||||
OptionsResource,
|
OptionsResource,
|
||||||
cancellable,
|
|
||||||
)
|
)
|
||||||
from synapse.http.site import SynapseRequest, SynapseSite
|
from synapse.http.site import SynapseRequest, SynapseSite
|
||||||
from synapse.logging.context import make_deferred_yieldable
|
from synapse.logging.context import make_deferred_yieldable
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
from synapse.util.cancellation import cancellable
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.http.server._base import test_disconnect
|
from tests.http.server._base import test_disconnect
|
||||||
|
|
Loading…
Reference in a new issue