Make pulumi.runtime.invoke
synchronous. (#3019)
These changes make the `pulumi.runtime.invoke` function invokable in a synchronous manner. Because this function still needs to perform asynchronous work under the covers--namely awaiting a provider URN and ID if a provider instance is present in the `InvokeOptions`--this requires some creativity. This creativity comes in the form of a helper function, `_sync_await`, that performs a logical yield from the currently running event, manually runs the event loop until the given future completes, performs a logical resume back to the currently executing event, and returns the result of the future. The code in `_sync_await` is a bit scary, as it relies upon knowledge of (and functions in) the internals of the `asyncio` package. The necessary work performed in this function was derived from the implementations of `task_step` (which pointed out the need to call `_{enter,leave}_task`) and `BaseEventLoop.run_forever` (which illustrated how the event loop is pumped). In addition to potential breaking changes to these internals, the code may not work if a user has provided an alternative implementation for `EventLoop`. That said, the code is a close enough copy of `BaseEventLoop.run_forever` that it should be a reasonable solution.
This commit is contained in:
parent
654f5e2d35
commit
63eb7abb59
|
@ -15,6 +15,8 @@ CHANGELOG
|
|||
|
||||
- Fix `get_secret` in Python SDK always returning None.
|
||||
|
||||
- Make `pulumi.runtime.invoke` synchronous in the Python SDK [#3019](https://github.com/pulumi/pulumi/pull/3019)
|
||||
|
||||
### Compatibility
|
||||
|
||||
- Deprecated functions in `@pulumi/pulumi` will now issue warnings if you call them. Please migrate
|
||||
|
|
|
@ -24,8 +24,59 @@ from ..runtime.proto import provider_pb2
|
|||
from . import rpc
|
||||
from .rpc_manager import RPC_MANAGER
|
||||
|
||||
def _sync_await(awaitable: Awaitable[Any]) -> Any:
|
||||
"""
|
||||
_sync_await waits for the given future to complete by effectively yielding the current task and pumping the event
|
||||
loop.
|
||||
"""
|
||||
|
||||
def invoke(tok: str, props: Inputs, opts: InvokeOptions = None) -> Awaitable[Any]:
|
||||
# Fetch the current event loop and ensure a future.
|
||||
loop = asyncio.get_event_loop()
|
||||
fut = asyncio.ensure_future(awaitable)
|
||||
|
||||
# If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair
|
||||
# amount of bookkeeping logic around loop startup and shutdown.
|
||||
if not loop.is_running():
|
||||
return loop.run_until_complete(fut)
|
||||
|
||||
# If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to
|
||||
# the event loop--by calling _leave_task.
|
||||
task = asyncio.Task.current_task(loop)
|
||||
if task is not None:
|
||||
asyncio.tasks._leave_task(loop, task)
|
||||
|
||||
# Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not
|
||||
# work with alternative event loop implementations.
|
||||
while not fut.done() and not fut.cancelled():
|
||||
loop._run_once()
|
||||
if loop._stopping:
|
||||
break
|
||||
|
||||
# If we were executing inside a task, restore its context and continue on.
|
||||
if task is not None:
|
||||
asyncio.tasks._enter_task(loop, task)
|
||||
|
||||
# Return the result of the future.
|
||||
return fut.result()
|
||||
|
||||
class InvokeResult:
|
||||
"""
|
||||
InvokeResult is a helper type that wraps a prompt value in an Awaitable.
|
||||
"""
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
# pylint: disable=using-constant-test
|
||||
def __await__(self):
|
||||
# We need __await__ to be an iterator, but we only want it to return one value. As such, we use
|
||||
# `if False: yield` to construct this.
|
||||
if False:
|
||||
yield self.value
|
||||
return self.value
|
||||
|
||||
__iter__ = __await__
|
||||
|
||||
def invoke(tok: str, props: Inputs, opts: InvokeOptions = None) -> InvokeResult:
|
||||
"""
|
||||
invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
|
||||
can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
|
||||
|
@ -72,6 +123,7 @@ def invoke(tok: str, props: Inputs, opts: InvokeOptions = None) -> Awaitable[Any
|
|||
raise Exception(details)
|
||||
|
||||
resp = await asyncio.get_event_loop().run_in_executor(None, do_invoke)
|
||||
|
||||
log.debug(f"Invoking function completed successfully: tok={tok}")
|
||||
# If the invoke failed, raise an error.
|
||||
if resp.failures:
|
||||
|
@ -83,4 +135,10 @@ def invoke(tok: str, props: Inputs, opts: InvokeOptions = None) -> Awaitable[Any
|
|||
return rpc.deserialize_properties(ret_obj)
|
||||
return {}
|
||||
|
||||
return asyncio.ensure_future(RPC_MANAGER.do_rpc("invoke", do_invoke)())
|
||||
async def do_rpc():
|
||||
resp, exn = await RPC_MANAGER.do_rpc("invoke", do_invoke)()
|
||||
if exn is not None:
|
||||
raise exn
|
||||
return resp
|
||||
|
||||
return InvokeResult(_sync_await(asyncio.ensure_future(do_rpc())))
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# limitations under the License.
|
||||
import asyncio
|
||||
import traceback
|
||||
from typing import Callable, Awaitable
|
||||
from typing import Callable, Awaitable, Tuple, Any
|
||||
from .. import log
|
||||
|
||||
|
||||
|
@ -46,7 +46,7 @@ class RPCManager:
|
|||
self.count = 0
|
||||
self.exception_future = asyncio.Future()
|
||||
|
||||
def do_rpc(self, name: str, rpc_function: Callable[..., Awaitable[None]]) -> Callable[..., Awaitable[None]]:
|
||||
def do_rpc(self, name: str, rpc_function: Callable[..., Awaitable[Tuple[Any, Exception]]]) -> Callable[..., Awaitable[Tuple[Any, Exception]]]:
|
||||
"""
|
||||
Wraps a given RPC function by producing an awaitable function suitable to be run in the asyncio
|
||||
event loop. The wrapped function catches all unhandled exceptions and reports them to the exception
|
||||
|
@ -66,12 +66,14 @@ class RPCManager:
|
|||
|
||||
try:
|
||||
result = await rpc_function(*args, **kwargs)
|
||||
exception = None
|
||||
except Exception as exn:
|
||||
log.debug(f"RPC failed with exception:")
|
||||
log.debug(traceback.format_exc())
|
||||
if not self.exception_future.done():
|
||||
self.exception_future.set_exception(exn)
|
||||
result = None
|
||||
exception = exn
|
||||
|
||||
async with self.zero_cond:
|
||||
self.count -= 1
|
||||
|
@ -82,7 +84,7 @@ class RPCManager:
|
|||
self.zero_cond.notify_all()
|
||||
log.debug(f"recorded RPC completion, {self.count} RPCs outstanding")
|
||||
|
||||
return result
|
||||
return result, exception
|
||||
|
||||
return rpc_wrapper
|
||||
|
||||
|
|
|
@ -38,8 +38,8 @@ class MyComponent(ComponentResource):
|
|||
|
||||
# Explicitly use a provider for an Invoke.
|
||||
prov = MyProvider("testprov")
|
||||
async def do_provider_invoke():
|
||||
value = await invoke("test:index:MyFunction", props={"value": 9000}, opts=InvokeOptions(provider=prov))
|
||||
def do_provider_invoke():
|
||||
value = invoke("test:index:MyFunction", props={"value": 9000}, opts=InvokeOptions(provider=prov)).value
|
||||
return value["value"]
|
||||
|
||||
res = MyResource("resourceA", do_provider_invoke())
|
||||
|
@ -51,8 +51,8 @@ res.value.apply(lambda v: assert_eq(v, 9001))
|
|||
# performing the invoke.
|
||||
componentRes = MyComponent("resourceB", opts=ResourceOptions(providers={"test": prov}))
|
||||
|
||||
async def do_provider_invoke_with_parent(parent):
|
||||
value = await invoke("test:index:MyFunctionWithParent", props={"value": 41}, opts=InvokeOptions(parent=parent))
|
||||
def do_provider_invoke_with_parent(parent):
|
||||
value = invoke("test:index:MyFunctionWithParent", props={"value": 41}, opts=InvokeOptions(parent=parent)).value
|
||||
return value["value"]
|
||||
|
||||
res2 = MyResource("resourceC", do_provider_invoke_with_parent(componentRes))
|
||||
|
|
|
@ -11,13 +11,13 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
from pulumi import CustomResource, Output, log
|
||||
from pulumi.runtime import invoke
|
||||
|
||||
def assert_eq(l, r):
|
||||
assert l == r
|
||||
|
||||
|
||||
class MyResource(CustomResource):
|
||||
value: Output[int]
|
||||
|
||||
|
@ -26,14 +26,20 @@ class MyResource(CustomResource):
|
|||
"value": value,
|
||||
})
|
||||
|
||||
async def get_value2():
|
||||
await asyncio.sleep(0)
|
||||
return 42
|
||||
|
||||
value = invoke("test:index:MyFunction", props={
|
||||
"value": 41,
|
||||
})
|
||||
def do_invoke():
|
||||
value = invoke("test:index:MyFunction", props={"value": 41, "value2": get_value2()}).value
|
||||
return value["value"]
|
||||
|
||||
async def do_invoke():
|
||||
value = await invoke("test:index:MyFunction", props={"value": 41})
|
||||
async def await_invoke():
|
||||
value = await invoke("test:index:MyFunction", props={"value": 41, "value2": get_value2()})
|
||||
return value["value"]
|
||||
|
||||
res = MyResource("resourceA", do_invoke())
|
||||
res.value.apply(lambda v: assert_eq(v, 42))
|
||||
|
||||
res2 = MyResource("resourceB", await_invoke())
|
||||
res2.value.apply(lambda v: assert_eq(v, 42))
|
||||
|
|
|
@ -19,13 +19,14 @@ class TestInvoke(LanghostTest):
|
|||
def test_invoke_success(self):
|
||||
self.run_test(
|
||||
program=path.join(self.base_path(), "invoke"),
|
||||
expected_resource_count=1)
|
||||
expected_resource_count=2)
|
||||
|
||||
def invoke(self, _ctx, token, args, provider, _version):
|
||||
self.assertEqual("test:index:MyFunction", token)
|
||||
self.assertEqual("", provider)
|
||||
self.assertDictEqual({
|
||||
"value": 41,
|
||||
"value2": 42,
|
||||
}, args)
|
||||
|
||||
return [], {
|
||||
|
@ -36,7 +37,6 @@ class TestInvoke(LanghostTest):
|
|||
_parent, _custom, _protect, _provider, _property_deps, _delete_before_replace,
|
||||
_ignore_changes, _version):
|
||||
self.assertEqual("test:index:MyResource", ty)
|
||||
self.assertEqual("resourceA", name)
|
||||
self.assertEqual(resource["value"], 42)
|
||||
|
||||
return {
|
||||
|
@ -57,6 +57,7 @@ class TestInvokeWithFailures(LanghostTest):
|
|||
self.assertEqual("test:index:MyFunction", token)
|
||||
self.assertDictEqual({
|
||||
"value": 41,
|
||||
"value2": 42,
|
||||
}, args)
|
||||
|
||||
return [{"property": "value", "reason": "the invoke failed"}], {}
|
||||
|
|
|
@ -11,11 +11,13 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
import pulumi
|
||||
from pulumi.runtime import invoke
|
||||
|
||||
async def do_invoke():
|
||||
value = await invoke("test:index:MyFunction", props={"value": 41})
|
||||
await asyncio.sleep(0)
|
||||
value = invoke("test:index:MyFunction", props={"value": 41}).value
|
||||
return value["value"]
|
||||
|
||||
pulumi.export("value", do_invoke())
|
||||
|
|
Loading…
Reference in a new issue