Remove threading from mock tests to fix AIO waits on diff event loop (#7666)

* Remove threading from mock tests to fix AIO waits on diff event loop

* Add reference to the proper fix
This commit is contained in:
Anton Tayanovskyy 2021-08-16 18:53:15 -04:00 committed by GitHub
parent b5ee840b16
commit 4d4642c23d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 153 additions and 70 deletions

View file

@ -1,7 +1,22 @@
# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import pulumi
class MyComponent(pulumi.ComponentResource):
outprop: pulumi.Output[str]
def __init__(self, name, inprop: pulumi.Input[str] = None, opts = None):
@ -36,9 +51,6 @@ class Module(pulumi.runtime.ResourceModule):
raise Exception(f"unknown resource type {typ}")
pulumi.runtime.register_resource_module("aws", "ec2/instance", Module())
class MyCustom(pulumi.CustomResource):
instance: pulumi.Output
def __init__(self, resource_name, props: Optional[dict] = None, opts = None):
@ -49,20 +61,33 @@ def do_invoke():
value = pulumi.runtime.invoke("test:index:MyFunction", props={"value": 41}).value
return value["out_value"]
mycomponent = MyComponent("mycomponent", inprop="hello")
myinstance = Instance("instance",
name="myvm",
value=pulumi.Output.secret("secret_value"))
mycustom = MyCustom("mycustom", {"instance": myinstance})
invoke_result = do_invoke()
# Pass myinstance several more times to ensure deserialization of the resource reference
# works on other asyncio threads.
for x in range(5):
MyCustom(f"mycustom{x}", {"instance": myinstance})
def define_resources():
mycomponent = MyComponent("mycomponent", inprop="hello")
myinstance = Instance("instance",
name="myvm",
value=pulumi.Output.secret("secret_value"))
mycustom = MyCustom("mycustom", {"instance": myinstance})
invoke_result = do_invoke()
dns_ref = pulumi.StackReference("dns")
# Pass myinstance several more times to ensure deserialization of the resource reference
# works on other asyncio threads.
for x in range(5):
MyCustom(f"mycustom{x}", {"instance": myinstance})
pulumi.export("hello", "world")
pulumi.export("outprop", mycomponent.outprop)
pulumi.export("public_ip", myinstance.public_ip)
dns_ref = pulumi.StackReference("dns")
pulumi.export("hello", "world")
pulumi.export("outprop", mycomponent.outprop)
pulumi.export("public_ip", myinstance.public_ip)
return {
'mycomponent': mycomponent,
'myinstance': myinstance,
'mycustom': mycustom,
'dns_ref': dns_ref,
'invoke_result': invoke_result
}
pulumi.runtime.register_resource_module("aws", "ec2/instance", Module())

View file

@ -1,4 +1,4 @@
# Copyright 2016-2018, Pulumi Corporation.
# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -11,9 +11,99 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import pulumi
from concurrent.futures import ThreadPoolExecutor
import asyncio
import grpc
import pulumi
import pytest
import resources
import unittest
@pytest.fixture
def my_resources():
loop = asyncio.get_event_loop()
loop.set_default_executor(ImmediateExecutor())
old_settings = pulumi.runtime.settings.SETTINGS
try:
pulumi.runtime.mocks.set_mocks(MyMocks())
yield resources.define_resources()
finally:
pulumi.runtime.settings.configure(old_settings)
loop.set_default_executor(ThreadPoolExecutor())
@pulumi.runtime.test
def test_component(my_resources):
def check_outprop(outprop):
assert outprop == 'output: hello'
return my_resources['mycomponent'].outprop.apply(check_outprop)
@pulumi.runtime.test
def test_custom(my_resources):
def check_ip(ip):
assert ip == '203.0.113.12'
return my_resources['myinstance'].public_ip.apply(check_ip)
@pulumi.runtime.test
def test_custom_resource_reference(my_resources):
def check_instance(instance):
assert isinstance(instance, resources.Instance)
def check_ip(ip):
assert ip == '203.0.113.12'
instance.public_ip.apply(check_ip)
return my_resources['mycustom'].instance.apply(check_instance)
@pulumi.runtime.test
def test_invoke(my_resources):
assert my_resources['invoke_result'] == 59
@pulumi.runtime.test
def test_invoke_failures(my_resources):
caught = False
try:
pulumi.runtime.invoke("test:index:FailFunction", props={})
except Exception as e:
caught = str(e)
assert 'this function fails!' in caught
@pulumi.runtime.test
def test_invoke_throws(my_resources):
caught = None
try:
pulumi.runtime.invoke("test:index:ThrowFunction", props={})
except Exception as e:
caught = str(e)
assert 'this function throws!' in caught
@pulumi.runtime.test
def test_stack_reference(my_resources):
def check_outputs(outputs):
assert outputs["haha"] == "business"
my_resources['dns_ref'].outputs.apply(check_outputs)
class GrpcError(grpc.RpcError):
@ -66,59 +156,27 @@ class MyMocks(pulumi.runtime.Mocks):
return ['', {}]
pulumi.runtime.set_mocks(MyMocks())
class ImmediateExecutor(ThreadPoolExecutor):
"""This removes multithreading from current tests. Unfortunately in
presence of multithreading the tests are flaky. The proper fix is
postponed - see https://github.com/pulumi/pulumi/issues/7663
# Now actually import the code that creates resources, and then test it.
import resources
"""
def __init__(self):
super()
self._default_executor = ThreadPoolExecutor()
class TestingWithMocks(unittest.TestCase):
@unittest.skip(reason="Skipping flaky test tracked in https://github.com/pulumi/pulumi/issues/6561")
@pulumi.runtime.test
def test_component(self):
def check_outprop(outprop):
self.assertEqual(outprop, 'output: hello')
return resources.mycomponent.outprop.apply(check_outprop)
def submit(self, fn, *args, **kwargs):
v = fn(*args, **kwargs)
return self._default_executor.submit(ImmediateExecutor._identity, v)
@pulumi.runtime.test
def test_custom(self):
def check_ip(ip):
self.assertEqual(ip, '203.0.113.12')
return resources.myinstance.public_ip.apply(check_ip)
def map(self, func, *iterables, timeout=None, chunksize=1):
raise Exception('map not implemented')
@pulumi.runtime.test
def test_custom_resource_reference(self):
def check_instance(instance):
self.assertIsInstance(instance, resources.Instance)
def check_ip(ip):
self.assertEqual(ip, '203.0.113.12')
instance.public_ip.apply(check_ip)
return resources.mycustom.instance.apply(check_instance)
def shutdown(self, wait=True, cancel_futures=False):
raise Exception('shutdown not implemented')
@pulumi.runtime.test
def test_invoke(self):
return self.assertEqual(resources.invoke_result, 59)
@pulumi.runtime.test
def test_invoke_failures(self):
caught = False
try:
pulumi.runtime.invoke("test:index:FailFunction", props={})
except Exception:
caught = True
self.assertTrue(caught)
@pulumi.runtime.test
def test_invoke_throws(self):
caught = False
try:
pulumi.runtime.invoke("test:index:ThrowFunction", props={})
except Exception:
caught = True
self.assertTrue(caught)
@pulumi.runtime.test
def test_stack_reference(self):
def check_outputs(outputs):
self.assertEqual(outputs["haha"], "business")
resources.dns_ref.outputs.apply(check_outputs)
@staticmethod
def _identity(x):
return x