Fix a reentrancy issue in _sync_await. (#3056)

_sync_await was not reentrant with respect to _run_once: the latter
captures the length of the ready list before it iterates it, and the
former drains the ready list by reentering _run_once. Fix this by
tracking the length of the list before pumping the event loop and then
pushing cancelled handles on to the list as necessary after pumping the
loop.

These changes also fix an issue with `export`ing awaitables.

Fixes #3038.
This commit is contained in:
Pat Gavlin 2019-08-08 19:51:11 -07:00 committed by GitHub
parent 24e2c6f791
commit 91af1a93c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 0 deletions

View file

@ -14,6 +14,9 @@ CHANGELOG
- Fix a bug in the Node.JS SDK that caused failure details for provider functions to go unreported.
[#3048](https://github.com/pulumi/pulumi/pull/3048)
- Fix a bug in the Python SDK that caused crashes when using asynchronous data sources.
[#3056](https://github.com/pulumi/pulumi/pull/3056)
## 0.17.28 (2019-08-05)
- Retry renaming a temporary folder during plugin installation

View file

@ -66,10 +66,23 @@ def _sync_await(awaitable: Awaitable[Any]) -> Any:
# Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not
# work with alternative event loop implementations.
#
# In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the
# ready list and ensure that there are exactly that many handles on the list once we are finished.
#
# See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the
# _run_once kernel with which we need to cooperate.
ntodo = len(loop._ready)
while not fut.done() and not fut.cancelled():
loop._run_once()
if loop._stopping:
break
# If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing
# cancelled handles onto the list.
while len(loop._ready) < ntodo:
handle = asyncio.Handle(None, None, loop)
handle._cancelled = True
loop._ready.append(handle)
# If we were executing inside a task, restore its context and continue on.
if task is not None:

View file

@ -17,6 +17,7 @@ Support for automatic stack components.
"""
import asyncio
import collections
from inspect import isawaitable
from typing import Callable, Any, Dict, List
from ..resource import ComponentResource, Resource
@ -146,6 +147,9 @@ def massage(attr: Any, seen: List[Any]):
if isinstance(attr, Output):
return attr.apply(lambda v: massage(v, seen))
if isawaitable(attr):
return Output.from_input(attr).apply(lambda v: massage(v, seen))
if hasattr(attr, "__dict__"):
# recurse on the dictionary itself. It will be handled above.
return massage(attr.__dict__, seen)

View file

@ -0,0 +1,13 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,22 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from pulumi import export
from pulumi.runtime import invoke
async def await_invoke():
return await invoke("test:index:MyFunction", {})
export("f1", asyncio.ensure_future(await_invoke()))
export("f2", asyncio.ensure_future(await_invoke()))

View file

@ -0,0 +1,27 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class TestInvokeFuture(LanghostTest):
def test_invoke_future(self):
self.run_test(
program=path.join(self.base_path(), "invoke_future"),
expected_resource_count=0)
def invoke(self, _ctx, token, args, provider, _version):
self.assertEqual("test:index:MyFunction", token)
self.assertEqual("", provider)
return [], {"value": 42}