[sdk/python] - Gracefully handle monitor shutdown (#6249)

This commit is contained in:
Komal 2021-02-04 20:28:39 -07:00 committed by GitHub
parent 059382c336
commit 3a3b96de72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 62 additions and 77 deletions

View file

@ -2,7 +2,9 @@ CHANGELOG
=========
## HEAD (Unreleased)
_(none)_
- [sdk/python] Gracefully handle monitor shutdown in the python runtime without exiting the process.
[#6249](https://github.com/pulumi/pulumi/pull/6249)
## 2.20.0 (2021-02-03)

View file

@ -13,8 +13,7 @@
# limitations under the License.
import asyncio
import os
import sys
from typing import Any, Awaitable, Optional, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
import grpc
from .. import log
@ -23,7 +22,7 @@ from ..invoke import InvokeOptions
from ..runtime.proto import provider_pb2
from . import rpc
from .rpc_manager import RPC_MANAGER
from .settings import get_monitor
from .settings import get_monitor, handle_grpc_error
from .sync_await import _sync_await
if TYPE_CHECKING:
@ -66,6 +65,7 @@ class InvokeResult:
__iter__ = __await__
def invoke(tok: str, props: 'Inputs', opts: Optional[InvokeOptions] = None, typ: Optional[type] = None) -> InvokeResult:
"""
invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
@ -109,18 +109,7 @@ def invoke(tok: str, props: 'Inputs', opts: Optional[InvokeOptions] = None, typ:
try:
return monitor.Invoke(req)
except grpc.RpcError as exn:
# gRPC-python gets creative with their exceptions. grpc.RpcError as a type is useless;
# the usefullness come from the fact that it is polymorphically also a grpc.Call and thus has
# the .code() member. Pylint doesn't know this because it's not known statically.
#
# Neither pylint nor I are the only ones who find this confusing:
# https://github.com/grpc/grpc/issues/10885#issuecomment-302581315
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
resp = await asyncio.get_event_loop().run_in_executor(None, do_invoke)

View file

@ -13,7 +13,6 @@
# limitations under the License.
import asyncio
import os
import sys
import traceback
from typing import Optional, Any, Callable, List, NamedTuple, Dict, Set, Tuple, Union, TYPE_CHECKING, cast
@ -22,10 +21,9 @@ import grpc
from . import rpc, settings, known_types
from .. import log
from .invoke import invoke
from ..runtime.proto import provider_pb2, resource_pb2
from .rpc_manager import RPC_MANAGER
from ..metadata import get_project, get_stack
from .settings import handle_grpc_error
if TYPE_CHECKING:
from .. import Resource, ResourceOptions, CustomResource, Inputs, Output
@ -157,7 +155,7 @@ def resource_output(res: 'Resource') -> Tuple[Callable[[Any, bool, bool, Optiona
known_future.set_result(known)
secret_future.set_result(secret)
return (resolve, Output({res}, value_future, known_future, secret_future))
return resolve, Output({res}, value_future, known_future, secret_future)
def get_resource(res: 'Resource', props: 'Inputs', custom: bool, urn: str) -> None:
@ -191,18 +189,7 @@ def get_resource(res: 'Resource', props: 'Inputs', custom: bool, urn: str) -> No
try:
return monitor.Invoke(req)
except grpc.RpcError as exn:
# gRPC-python gets creative with their exceptions. grpc.RpcError as a type is useless;
# the usefullness come from the fact that it is polymorphically also a grpc.Call and thus has
# the .code() member. Pylint doesn't know this because it's not known statically.
#
# Neither pylint nor I are the only ones who find this confusing:
# https://github.com/grpc/grpc/issues/10885#issuecomment-302581315
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
resp = await asyncio.get_event_loop().run_in_executor(None, do_invoke)
@ -310,14 +297,7 @@ def read_resource(res: 'CustomResource', ty: str, name: str, props: 'Inputs', op
try:
return monitor.ReadResource(req)
except grpc.RpcError as exn:
# See the comment on invoke for the justification for disabling
# this warning
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
resp = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
@ -444,7 +424,7 @@ def register_resource(res: 'Resource',
remote=remote,
)
from ..resource import create_urn # pylint: disable=import-outside-toplevel
from ..resource import create_urn # pylint: disable=import-outside-toplevel
mock_urn = await create_urn(name, ty, resolver.parent_urn).future()
def do_rpc_call():
@ -456,14 +436,7 @@ def register_resource(res: 'Resource',
try:
return monitor.RegisterResource(req)
except grpc.RpcError as exn:
# See the comment on invoke for the justification for disabling
# this warning
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
resp = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
except Exception as exn:
@ -475,6 +448,9 @@ def register_resource(res: 'Resource',
resolve_id(None, True, False, exn)
raise
if resp is None:
return
log.debug(f"resource registration successful: ty={ty}, urn={resp.urn}")
resolve_urn(resp.urn, True, False, None)
if resolve_id is not None:
@ -491,7 +467,6 @@ def register_resource(res: 'Resource',
urns = list(v.urns)
deps[k] = set(map(new_dependency, urns))
rpc.resolve_outputs(res, resolver.serialized_props, resp.object, deps, resolvers)
asyncio.ensure_future(RPC_MANAGER.do_rpc(
@ -516,14 +491,7 @@ def register_resource_outputs(res: 'Resource', outputs: 'Union[Inputs, Output[In
try:
return monitor.RegisterResourceOutputs(req)
except grpc.RpcError as exn:
# See the comment on invoke for the justification for disabling
# this warning
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
log.debug(
@ -549,9 +517,9 @@ class RegisterResponse:
# pylint: disable=redefined-builtin
def __init__(self,
urn: str,
id: str,
id: Optional[str],
object: struct_pb2.Struct,
propertyDependencies: Dict[str, PropertyDependencies]):
propertyDependencies: Optional[Dict[str, PropertyDependencies]]):
self.urn = urn
self.id = id
self.object = object

View file

@ -17,7 +17,6 @@ Runtime settings and configuration.
"""
import asyncio
import os
import sys
from typing import Optional, Union, Any, TYPE_CHECKING
import grpc
@ -220,15 +219,9 @@ async def monitor_supports_feature(feature: str) -> bool:
resp = monitor.SupportsFeature(req)
return resp.hasSupport
except grpc.RpcError as exn:
# See the comment on invoke for the justification for disabling
# this warning
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
sys.exit(0)
if exn.code() == grpc.StatusCode.UNIMPLEMENTED:
return False
details = exn.details()
raise Exception(details)
handle_grpc_error(exn)
result = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
SETTINGS.feature_support[feature] = result
@ -236,6 +229,23 @@ async def monitor_supports_feature(feature: str) -> bool:
return SETTINGS.feature_support[feature]
def handle_grpc_error(exn: grpc.RpcError):
# gRPC-python gets creative with their exceptions. grpc.RpcError as a type is useless;
# the usefulness come from the fact that it is polymorphically also a grpc.Call and thus has
# the .code() member. Pylint doesn't know this because it's not known statically.
#
# Neither pylint nor I are the only ones who find this confusing:
# https://github.com/grpc/grpc/issues/10885#issuecomment-302581315
# pylint: disable=no-member
if exn.code() == grpc.StatusCode.UNAVAILABLE:
# If the monitor is unavailable, it is in the process of shutting down or has already
# shut down. Don't emit an error if this is the case.
return
details = exn.details()
raise Exception(details)
async def monitor_supports_secrets() -> bool:
return await monitor_supports_feature("secrets")

View file

@ -16,7 +16,6 @@
Support for automatic stack components.
"""
import asyncio
import collections
from inspect import isawaitable
from typing import Callable, Any, Dict, List, TYPE_CHECKING
@ -45,9 +44,10 @@ async def run_pulumi_func(func: Callable):
# We await each RPC in turn so that this loop will actually block rather than busy-wait.
while True:
await asyncio.sleep(0)
if len(RPC_MANAGER.rpcs) == 0:
rpcs_remaining = len(RPC_MANAGER.rpcs)
if rpcs_remaining == 0:
break
log.debug(f"waiting for quiescence; {len(RPC_MANAGER.rpcs)} RPCs outstanding")
log.debug(f"waiting for quiescence; {rpcs_remaining} RPCs outstanding")
await RPC_MANAGER.rpcs.pop()
# Asyncio event loops require that all outstanding tasks be completed by the time that the

View file

@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import grpc
import sys
import traceback
from contextlib import suppress
from .workspace import PulumiFn
from ...log import *
from ... import log
from ...runtime.proto import language_pb2, plugin_pb2, LanguageRuntimeServicer
from ...runtime import run_in_stack, reset_options, set_all_config
from ...errors import RunError
@ -61,10 +63,24 @@ class LanguageServer(LanguageRuntimeServicer):
try:
loop.run_until_complete(run_in_stack(self.program))
except RunError as exn:
result.error = str(exn)
msg = str(exn)
log.error(msg)
result.error = str(msg)
return result
except grpc.RpcError as exn:
# If the monitor is unavailable, it is in the process of shutting down or has already
# shut down. Don't emit an error if this is the case.
if exn.code() == grpc.StatusCode.UNAVAILABLE:
log.debug("Resource monitor has terminated, shutting down.")
else:
msg = f"RPC error: {exn.details()}"
log.error(msg)
result.error = msg
return result
except Exception as exn:
result.error = str(f"python inline source runtime error: {exn}\n{traceback.format_exc()}")
msg = str(f"python inline source runtime error: {exn}\n{traceback.format_exc()}")
log.error(msg)
result.error = msg
return result
finally:
# If there's an exception during `run_in_stack`, it may result in pending asyncio tasks remaining unresolved
@ -72,6 +88,7 @@ class LanguageServer(LanguageRuntimeServicer):
# logged to stdout. To avoid this, we collect all the unresolved tasks in the loop and cancel them before
# closing the loop.
pending = asyncio.Task.all_tasks(loop) if _py_version_less_than_3_7 else asyncio.all_tasks(loop)
log.debug(f"Cancelling {len(pending)} tasks.")
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):

View file

@ -22,7 +22,7 @@ from typing import List, Any, Mapping, MutableMapping, Optional
from .cmd import CommandResult, _run_pulumi_cmd, OnOutput
from .config import ConfigValue, ConfigMap, _SECRET_SENTINEL
from .errors import StackAlreadyExistsError
from .server import LanguageServer
from ._server import LanguageServer
from .workspace import Workspace, PulumiFn, Deployment
from ...runtime.settings import _GRPC_CHANNEL_OPTIONS
from ...runtime.proto import language_pb2_grpc

View file

@ -26,8 +26,7 @@ from .stack_settings import StackSettings
from .project_settings import ProjectSettings
from .config import ConfigMap, ConfigValue
# TODO improve typing to encapsulate stack exports
PulumiFn = Callable[[], Any]
PulumiFn = Callable[[], None]
class StackSummary: