Do not drop unhandled exceptions in Python (#3170)

- Do not use a non-zero-to-zero transition in the number of outstanding
  RPCs to determine the completion of a Python program until after the
  synchronous piece of the program has finished running is complete
- Instead of using a future to indicate that either a) a zero-to-one
  transition in the number of outstanding RPCs has occurred, or b) an
  unhandled exception has occurred, a) observe the transition itself,
  and b) use an optional exception field to track the presence or
  absence of an exception.

Fixes #3162.
This commit is contained in:
Pat Gavlin 2019-09-06 13:53:07 -07:00 committed by GitHub
parent 685b30b685
commit 48c8ea1e8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 205 additions and 47 deletions

View file

@ -3,6 +3,9 @@ CHANGELOG
## HEAD (Unreleased)
- Fix a bug that caused the Python runtime to ignore unhandled exceptions and erroneously report that a Pulumi program executed successfully.
[#3170](https://github.com/pulumi/pulumi/pull/3170)
## 1.0.0 (2019-09-03)
- No significant changes.

View file

@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
import traceback
from typing import Callable, Awaitable, Tuple, Any
from typing import Callable, Awaitable, Tuple, Any, Optional
from .. import log
@ -25,26 +26,25 @@ class RPCManager:
outstanding RPCs.
"""
zero_cond: asyncio.Condition
"""
zero_cond is notified whenever the number of active RPCs transitions from
one to zero.
"""
count: int
"""
The number of active RPCs.
"""
exception_future: asyncio.Future
unhandled_exception: Optional[Exception]
"""
Future that is resolved whenever an unhandled exception occurs.
The first unhandled exception encountered during an RPC, if any occurs.
"""
exception_traceback: Optional[Any]
"""
The traceback associated with unhandled_exception, if any.
"""
def __init__(self):
self.zero_cond = asyncio.Condition()
self.count = 0
self.exception_future = asyncio.Future()
self.unhandled_exception = None
self.exception_traceback = None
def do_rpc(self, name: str, rpc_function: Callable[..., Awaitable[Tuple[Any, Exception]]]) -> Callable[..., Awaitable[Tuple[Any, Exception]]]:
"""
@ -60,9 +60,8 @@ class RPCManager:
"""
async def rpc_wrapper(*args, **kwargs):
log.debug(f"beginning rpc {name}")
async with self.zero_cond:
self.count += 1
log.debug(f"recorded new RPC, {self.count} RPCs outstanding")
self.count += 1
log.debug(f"recorded new RPC, {self.count} RPCs outstanding")
try:
result = await rpc_function(*args, **kwargs)
@ -70,39 +69,19 @@ class RPCManager:
except Exception as exn:
log.debug(f"RPC failed with exception:")
log.debug(traceback.format_exc())
if not self.exception_future.done():
self.exception_future.set_exception(exn)
if self.unhandled_exception is None:
self.unhandled_exception = exn
self.exception_traceback = sys.exc_info()[2]
result = None
exception = exn
async with self.zero_cond:
self.count -= 1
if self.count == 0:
log.debug("All RPC completed, signalling completion")
if not self.exception_future.done():
self.exception_future.set_result(None)
self.zero_cond.notify_all()
log.debug(f"recorded RPC completion, {self.count} RPCs outstanding")
self.count -= 1
log.debug(f"recorded RPC completion, {self.count} RPCs outstanding")
return result, exception
return rpc_wrapper
async def wait_for_outstanding_rpcs(self) -> None:
"""
Blocks the calling task until all outstanding RPCs have completed. Returns immediately if
no RPCs have been initiated.
"""
async with self.zero_cond:
while self.count != 0:
await self.zero_cond.wait()
def unhandled_exeception(self) -> asyncio.Future:
"""
Returns a Future that is resolved abnormally whenever an RPC fails due to an unhandled exception.
"""
return self.exception_future
RPC_MANAGER: RPCManager = RPCManager()
"""

View file

@ -35,10 +35,6 @@ async def run_in_stack(func: Callable):
"""
try:
Stack(func)
# If an exception occurred when doing an RPC, this await will propegate the exception
# to the main thread.
await RPC_MANAGER.unhandled_exeception()
finally:
log.debug("Waiting for outstanding RPCs to complete")
@ -47,10 +43,10 @@ async def run_in_stack(func: Callable):
#
# Note that "asyncio.sleep(0)" is the blessed way to do this:
# https://github.com/python/asyncio/issues/284#issuecomment-154180935
await asyncio.sleep(0)
# Wait for all outstanding RPCs to retire.
await RPC_MANAGER.wait_for_outstanding_rpcs()
while True:
await asyncio.sleep(0)
if RPC_MANAGER.count == 0:
break
# Asyncio event loops require that all outstanding tasks be completed by the time that the
# event loop closes. If we're at this point and there are no outstanding RPCs, we should
@ -72,6 +68,9 @@ async def run_in_stack(func: Callable):
# Once we get scheduled again, all tasks have exited and we're good to go.
log.debug("run_in_stack completed")
if RPC_MANAGER.unhandled_exception is not None:
raise RPC_MANAGER.unhandled_exception.with_traceback(RPC_MANAGER.exception_traceback)
class Stack(ComponentResource):
"""

View file

@ -0,0 +1,13 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,39 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from pulumi import CustomResource, Output, ResourceOptions
from pulumi.runtime import invoke
class MyResource(CustomResource):
value: Output[str]
def __init__(self, name, value, opts):
CustomResource.__init__(self, "test:index:MyResource", name, props={
"value": value,
}, opts=opts)
# We run this invoke first because of the way in which it interacts with the RPC manager. Prior to #3170, the RPC
# manager would decide that all outstanding RPCs had finished on any non-zero -> zero transition in the number of
# outstanding RPCs. Because an invoke is considered an RPC, running any synchronous invokes before any other RPC
# would confuse this logic, which would cause us to drop exceptions that occurred during subsequent RPCs and
# incorrectly consider failed programs to have succeeded.
invoke("test:index:MyFunction", props={})
resA = MyResource("resourceA", "foo", None)
fut = asyncio.Future()
fut.set_exception(Exception("oh no"))
resB = MyResource("resourceB", fut, ResourceOptions(depends_on=[resA]))
resC = MyResource("resourceC", "baz", ResourceOptions(depends_on=[resB]))

View file

@ -0,0 +1,37 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class TestFutureFailure(LanghostTest):
def test_future_failure(self):
self.run_test(
program=path.join(self.base_path(), "future_failure"),
expected_resource_count=1,
expected_error="Program exited with non-zero exit code: 1")
def invoke(self, _ctx, token, args, provider, _version):
self.assertEqual("test:index:MyFunction", token)
return [], {}
def register_resource(self, _ctx, _dry_run, ty, name, resource, _deps,
_parent, _custom, _protect, _provider, _property_deps, _delete_before_replace,
_ignore_changes, _version):
self.assertEqual("test:index:MyResource", ty)
return {
"urn": self.make_urn(ty, name),
"id": name,
"object": resource,
}

View file

@ -0,0 +1,13 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,38 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pulumi import CustomResource, Output, ResourceOptions
from pulumi.runtime import invoke
class MyResource(CustomResource):
value: Output[str]
def __init__(self, name, value, opts):
CustomResource.__init__(self, "test:index:MyResource", name, props={
"value": value,
}, opts=opts)
# We run this invoke first because of the way in which it interacts with the RPC manager. Prior to #3170, the RPC
# manager would decide that all outstanding RPCs had finished on any non-zero -> zero transition in the number of
# outstanding RPCs. Because an invoke is considered an RPC, running any synchronous invokes before any other RPC
# would confuse this logic, which would cause us to drop exceptions that occurred during subsequent RPCs and
# incorrectly consider failed programs to have succeeded.
invoke("test:index:MyFunction", props={})
resA = MyResource("resourceA", "foo", None)
# The property marshaller does not support values of type `bytes`, so the runtime should fail when preparing this
# RegisterResource RPC. We use `depends_on` to ensure that resources are registered in a predictable order.
resB = MyResource("resourceB", "bar".encode("utf8"), ResourceOptions(depends_on=[resA]))
resC = MyResource("resourceC", "baz", ResourceOptions(depends_on=[resB]))

View file

@ -0,0 +1,37 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class TestMarshalFailure(LanghostTest):
def test_marshal_failure(self):
self.run_test(
program=path.join(self.base_path(), "marshal_failure"),
expected_resource_count=1,
expected_error="Program exited with non-zero exit code: 1")
def invoke(self, _ctx, token, args, provider, _version):
self.assertEqual("test:index:MyFunction", token)
return [], {}
def register_resource(self, _ctx, _dry_run, ty, name, resource, _deps,
_parent, _custom, _protect, _provider, _property_deps, _delete_before_replace,
_ignore_changes, _version):
self.assertEqual("test:index:MyResource", ty)
return {
"urn": self.make_urn(ty, name),
"id": name,
"object": resource,
}