[sdk/python] Fix mocks issue when passing a resource more than once (#6479)
This change avoids `RuntimeError: There is no current event loop in thread '<thread_name>'` errors when passing a resource as an input multiple times when using mocks. The problem is that when using mocks, we deserialize the gRPC inputs before passing them to the user's mock methods. Deserializing inputs doesn't typically require an event loop, however, during deserialization of resource references, we end up creating some instances of `Future`, which does require an event loop to be present for the current thread. If this is done multiple times for a resource, it's possible that `deserialize_properties` will be called on an asyncio thread that doesn't yet have an event loop, resulting in the error being raised. The error does not occur when only passing the resource reference once because typically the thread (e.g. `asyncio_0`) used in that case will have already had an event loop created for it due to the use of the internal `_syncawait` when _serializing_ the source resource's properties, which ensures an event loop is set for the thread. The fix is to ensure an event loop is created for the thread in the mocks implementation before calling `deserialize_properties`.
This commit is contained in:
parent
20e84dfe9e
commit
0b1414dc6d
|
@ -14,3 +14,5 @@
|
|||
|
||||
### Bug Fixes
|
||||
|
||||
- [sdk/python] Fix mocks issue when passing a resource more than once.
|
||||
[#6479](https://github.com/pulumi/pulumi/pull/6479)
|
||||
|
|
|
@ -23,7 +23,7 @@ from typing import Dict, NamedTuple, Optional, Tuple, TYPE_CHECKING
|
|||
from google.protobuf import empty_pb2
|
||||
from . import rpc
|
||||
from .settings import Settings, configure, get_stack, get_project, get_root_resource
|
||||
from .sync_await import _sync_await
|
||||
from .sync_await import _ensure_event_loop, _sync_await
|
||||
from ..runtime.proto import engine_pb2, provider_pb2, resource_pb2
|
||||
from ..runtime.stack import Stack, run_pulumi_func
|
||||
|
||||
|
@ -91,6 +91,9 @@ class MockMonitor:
|
|||
return "urn:pulumi:" + "::".join([get_stack(), get_project(), type_, name])
|
||||
|
||||
def Invoke(self, request):
|
||||
# Ensure we have an event loop on this thread because it's needed when deserializing resource references.
|
||||
_ensure_event_loop()
|
||||
|
||||
args = rpc.deserialize_properties(request.args)
|
||||
|
||||
if request.tok == "pulumi:pulumi:getResource":
|
||||
|
@ -109,6 +112,9 @@ class MockMonitor:
|
|||
return provider_pb2.InvokeResponse(**fields)
|
||||
|
||||
def ReadResource(self, request):
|
||||
# Ensure we have an event loop on this thread because it's needed when deserializing resource references.
|
||||
_ensure_event_loop()
|
||||
|
||||
state = rpc.deserialize_properties(request.properties)
|
||||
|
||||
id_, state = self.mocks.new_resource(request.type, request.name, state, request.provider, request.id)
|
||||
|
@ -127,6 +133,9 @@ class MockMonitor:
|
|||
if request.type == "pulumi:pulumi:Stack":
|
||||
return resource_pb2.RegisterResourceResponse(urn=urn)
|
||||
|
||||
# Ensure we have an event loop on this thread because it's needed when deserializing resource references.
|
||||
_ensure_event_loop()
|
||||
|
||||
inputs = rpc.deserialize_properties(request.object)
|
||||
|
||||
id_, state = self.mocks.new_resource(request.type, request.name, inputs, request.provider, request.importId)
|
||||
|
|
|
@ -45,15 +45,7 @@ def _sync_await(awaitable: Awaitable[Any]) -> Any:
|
|||
"""
|
||||
|
||||
# Fetch the current event loop and ensure a future.
|
||||
loop = None
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
except RuntimeError:
|
||||
pass
|
||||
if loop is None:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop = _ensure_event_loop()
|
||||
fut = asyncio.ensure_future(awaitable)
|
||||
|
||||
# If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair
|
||||
|
@ -93,3 +85,14 @@ def _sync_await(awaitable: Awaitable[Any]) -> Any:
|
|||
|
||||
# Return the result of the future.
|
||||
return fut.result()
|
||||
|
||||
|
||||
def _ensure_event_loop():
|
||||
"""Ensures an asyncio event loop exists for the current thread."""
|
||||
loop = None
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
return loop
|
||||
|
|
|
@ -56,6 +56,11 @@ myinstance = Instance("instance",
|
|||
mycustom = MyCustom("mycustom", {"instance": myinstance})
|
||||
invoke_result = do_invoke()
|
||||
|
||||
# Pass myinstance several more times to ensure deserialization of the resource reference
|
||||
# works on other asyncio threads.
|
||||
for x in range(5):
|
||||
MyCustom(f"mycustom{x}", {"instance": myinstance})
|
||||
|
||||
pulumi.export("hello", "world")
|
||||
pulumi.export("outprop", mycomponent.outprop)
|
||||
pulumi.export("public_ip", myinstance.public_ip)
|
||||
|
|
|
@ -67,6 +67,9 @@ class TestingWithMocks(unittest.TestCase):
|
|||
def test_custom_resource_reference(self):
|
||||
def check_instance(instance):
|
||||
self.assertIsInstance(instance, resources.Instance)
|
||||
def check_ip(ip):
|
||||
self.assertEqual(ip, '203.0.113.12')
|
||||
instance.public_ip.apply(check_ip)
|
||||
return resources.mycustom.instance.apply(check_instance)
|
||||
|
||||
@pulumi.runtime.test
|
||||
|
|
Loading…
Reference in a new issue