pulumi/sdk/python/lib/test/langhost/invoke/test_invoke.py
Pat Gavlin 63eb7abb59
Make pulumi.runtime.invoke synchronous. (#3019)
These changes make the `pulumi.runtime.invoke` function invokable in a
synchronous manner. Because this function still needs to perform
asynchronous work under the covers--namely awaiting a provider URN and
ID if a provider instance is present in the `InvokeOptions`--this
requires some creativity. This creativity comes in the form of a helper
function, `_sync_await`, that performs a logical yield from the
currently running event, manually runs the event loop until the given
future completes, performs a logical resume back to the
currently executing event, and returns the result of the future.

The code in `_sync_await` is a bit scary, as it relies upon knowledge of
(and functions in) the internals of the `asyncio` package. The necessary
work performed in this function was derived from the implementations of
`task_step` (which pointed out the need to call `_{enter,leave}_task`)
and `BaseEventLoop.run_forever` (which illustrated how the event loop is
pumped). In addition to potential breaking changes to these internals,
the code may not work if a user has provided an alternative implementation
for `EventLoop`. That said, the code is a close enough copy of
`BaseEventLoop.run_forever` that it should be a reasonable solution.
2019-08-02 14:19:56 -07:00

64 lines
2.1 KiB
Python

# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class TestInvoke(LanghostTest):
def test_invoke_success(self):
self.run_test(
program=path.join(self.base_path(), "invoke"),
expected_resource_count=2)
def invoke(self, _ctx, token, args, provider, _version):
self.assertEqual("test:index:MyFunction", token)
self.assertEqual("", provider)
self.assertDictEqual({
"value": 41,
"value2": 42,
}, args)
return [], {
"value": args["value"] + 1
}
def register_resource(self, _ctx, _dry_run, ty, name, resource, _deps,
_parent, _custom, _protect, _provider, _property_deps, _delete_before_replace,
_ignore_changes, _version):
self.assertEqual("test:index:MyResource", ty)
self.assertEqual(resource["value"], 42)
return {
"urn": self.make_urn(ty, name),
"id": name,
"object": resource,
}
class TestInvokeWithFailures(LanghostTest):
def test_invoke_failure(self):
self.run_test(
program=path.join(self.base_path(), "invoke"),
expected_resource_count=0,
expected_error="Program exited with non-zero exit code: 1")
def invoke(self, _ctx, token, args, _provider, _version):
self.assertEqual("test:index:MyFunction", token)
self.assertDictEqual({
"value": 41,
"value2": 42,
}, args)
return [{"property": "value", "reason": "the invoke failed"}], {}