Add 'Output.all' combinator for Python (#2293)

* Add 'Output.all' combinator for Python

Output.all is a useful combinator that we already have in Node that
allows the composition of a list of outputs into an output of a list.
This is very useful when authoring components and its lack of presence
in Python was an oversight.

This commit adds 'Output.all' and 'Output.from_input', adding tests and
documentation for each.

* start unwrap

* Add functionality and test for nested inputs
This commit is contained in:
Sean Gillespie 2018-12-18 13:22:04 -08:00 committed by GitHub
parent e86ea70ea0
commit 81c0de1e4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 360 additions and 1 deletions

View file

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from functools import reduce
from inspect import isawaitable
from typing import (
TypeVar,
@ -23,6 +24,7 @@ from typing import (
cast,
Mapping,
Any,
List,
TYPE_CHECKING
)
@ -172,3 +174,84 @@ class Output(Generic[T]):
Syntax sugar for looking up attributes dynamically off of outputs.
"""
return self.apply(lambda v: v[key])
@staticmethod
def from_input(val: Input[T]) -> 'Output[T]':
"""
Takes an Input value and produces an Output value from it, deeply unwrapping nested Input values as necessary
given the type.
"""
# Is it an output already? Recurse into the value contained within it.
if isinstance(val, Output):
return val.apply(Output.from_input)
# Is a dict or list? Recurse into the values within them.
if isinstance(val, dict):
# Since Output.all works on lists early, serialize this dictionary into a list of lists first.
# Once we have a output of the list of properties, we can use an apply to re-hydrate it back into a dict.
transformed_items = [[k, Output.from_input(v)] for k, v in val.items()]
return Output.all(*transformed_items).apply(lambda props: {k: v for k, v in props})
if isinstance(val, list):
transformed_items = [Output.from_input(v) for v in val]
return Output.all(*transformed_items)
# If it's not an output, list, or dict, it must be known.
is_known_fut = asyncio.Future()
is_known_fut.set_result(True)
# Is it awaitable? If so, schedule it for execution and use the resulting future
# as the value future for a new output.
if isawaitable(val):
promise_output = Output(set(), asyncio.ensure_future(val), is_known_fut)
return promise_output.apply(Output.from_input)
# Is it a prompt value? Set up a new resolved future and use that as the value future.
value_fut = asyncio.Future()
value_fut.set_result(val)
return Output(set(), value_fut, is_known_fut)
@staticmethod
def all(*args: List[Input[T]]) -> 'Output[List[T]]':
"""
Produces an Output of Lists from a List of Inputs.
This function can be used to combine multiple, separate Inputs into a single
Output which can then be used as the target of `apply`. This can be used
in the following manner:
```python
d1: Output[str]
d2: Output[str]
d3: Output[str] = Output.all(d1, d2).apply(lambda l: l[0] + l[1])
```
Resource dependencies are preserved in the returned Output.
"""
# Two asynchronous helper functions to assist in the implementation:
# is_known, which returns True if all of the input's values are known,
# and false if any of them are not known,
async def is_known(outputs):
is_known_futures = list(map(lambda o: o._is_known, outputs))
each_is_known = await asyncio.gather(*is_known_futures)
return all(each_is_known)
# gather_futures, which aggregates the list of futures in each input to a future of a list.
async def gather_futures(outputs):
value_futures = list(map(lambda o: asyncio.ensure_future(o.future()), outputs))
return await asyncio.gather(*value_futures)
# First, map all inputs to outputs using `from_input`.
all_outputs = list(map(Output.from_input, args))
# Merge the list of resource dependencies across all inputs.
resources = reduce(lambda acc, r: acc.union(r.resources()), all_outputs, set())
# Aggregate the list of futures into a future of lists.
value_futures = asyncio.ensure_future(gather_futures(all_outputs))
# Aggregate whether or not this output is known.
known_futures = asyncio.ensure_future(is_known(all_outputs))
return Output(resources, value_futures, known_futures)

View file

@ -0,0 +1,13 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,45 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pulumi import Output, CustomResource
class MyResource(CustomResource):
number: Output[str]
def __init__(self, name):
CustomResource.__init__(self, "test:index:MyResource", name, {
"number": None,
})
class FinalResource(CustomResource):
number: Output[str]
def __init__(self, name, number):
CustomResource.__init__(self, "test:index:FinalResource", name, {
"number": number,
})
def assert_eq(lhs, rhs):
assert lhs == rhs
res1 = MyResource("testResource1")
res2 = MyResource("testResource2")
res1.number.apply(lambda n: assert_eq(n, 2))
res2.number.apply(lambda n: assert_eq(n, 3))
# Output.all combines a list of outputs into an output of a list.
resSum = Output.all(res1.number, res2.number).apply(lambda l: l[0] + l[1])
FinalResource("testResource3", resSum)

View file

@ -0,0 +1,47 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class OutputAllTest(LanghostTest):
"""
"""
def test_output_all(self):
self.run_test(
program=path.join(self.base_path(), "output_all"),
expected_resource_count=3)
def register_resource(self, _ctx, _dry_run, ty, name, resource,
_dependencies):
number = 0
if name == "testResource1":
self.assertEqual(ty, "test:index:MyResource")
number = 2
elif name == "testResource2":
self.assertEqual(ty, "test:index:MyResource")
number = 3
elif name == "testResource3":
self.assertEqual(ty, "test:index:FinalResource")
# The source program uses Output.apply to merge outputs from the above two resources.
# The 5 is produced by adding 2 and 3 in the source program.
self.assertEqual(resource["number"], 5)
number = resource["number"]
return {
"urn": self.make_urn(ty, name),
"id": name,
"object": {
"number": number
}
}

View file

@ -0,0 +1,13 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,38 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pulumi import Output, CustomResource
class MyResource(CustomResource):
nested_numbers: Output[dict]
def __init__(self, name):
CustomResource.__init__(self, "test:index:MyResource", name, {
"nested_numbers": None,
})
class SumResource(CustomResource):
sum: Output[int]
def __init__(self, name, sum):
CustomResource.__init__(self, "test:index:SumResource", name, {
"sum": sum,
})
res1 = MyResource("testResource1")
res2 = MyResource("testResource2")
sum = Output.from_input(res1.nested_numbers).apply(lambda d: d["foo"]["bar"] + d["baz"])
sumRes = SumResource("sumResource", sum)

View file

@ -0,0 +1,56 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import path
from ..util import LanghostTest
class OutputNestedTest(LanghostTest):
def test_output_nested(self):
self.run_test(
program=path.join(self.base_path(), "output_nested"),
expected_resource_count=3)
def register_resource(self, _ctx, _dry_run, ty, name, resource,
_dependencies):
nested_numbers = None
if name == "testResource1":
self.assertEqual(ty, "test:index:MyResource")
nested_numbers = {
"foo": {
"bar": 9
},
"baz": 1,
}
elif name == "testResource2":
self.assertEqual(ty, "test:index:MyResource")
nested_numbers = {
"foo": {
"bar": 99,
},
"baz": 1,
}
elif name == "sumResource":
self.assertEqual(ty, "test:index:SumResource")
# The source program uses Output.apply to merge outputs from the above two resources.
# The 10 is produced by adding 9 and 1 in the source program, derived from nested properties of the
# testResource1 nested_numbers property.
self.assertEqual(resource["sum"], 10)
nested_numbers = resource["sum"]
return {
"urn": self.make_urn(ty, name),
"id": name,
"object": {
"nested_numbers": nested_numbers
}
}

View file

@ -104,7 +104,7 @@ class NextSerializationTests(unittest.TestCase):
self.assertEqual("hello.txt", prop["path"])
@async_test
async def test_file_asset(self):
async def test_remote_asset(self):
asset = RemoteAsset("https://pulumi.io")
prop = await rpc.serialize_property(asset, [])
self.assertEqual(rpc._special_asset_sig, prop[rpc._special_sig_key])
@ -125,6 +125,70 @@ class NextSerializationTests(unittest.TestCase):
self.assertListEqual(deps, [existing, res])
self.assertEqual(42, prop)
@async_test
async def test_output_all(self):
res = FakeCustomResource("some-resource")
fut = asyncio.Future()
fut.set_result(42)
known_fut = asyncio.Future()
known_fut.set_result(True)
out = Output({res}, fut, known_fut)
other = Output.from_input(99)
combined = Output.all(out, other)
deps = []
prop = await rpc.serialize_property(combined, deps)
self.assertListEqual(deps, [res])
self.assertEqual([42, 99], prop)
@async_test
async def test_output_all_composes_dependencies(self):
res = FakeCustomResource("some-resource")
fut = asyncio.Future()
fut.set_result(42)
known_fut = asyncio.Future()
known_fut.set_result(True)
out = Output({res}, fut, known_fut)
other = FakeCustomResource("some-other-resource")
other_fut = asyncio.Future()
other_fut.set_result(99)
other_known_fut = asyncio.Future()
other_known_fut.set_result(True)
other_out = Output({other}, other_fut, other_known_fut)
combined = Output.all(out, other_out)
deps = []
prop = await rpc.serialize_property(combined, deps)
self.assertSetEqual(set(deps), {res, other})
self.assertEqual([42, 99], prop)
@async_test
async def test_output_all_known_if_all_are_known(self):
res = FakeCustomResource("some-resource")
fut = asyncio.Future()
fut.set_result(42)
known_fut = asyncio.Future()
known_fut.set_result(True)
out = Output({res}, fut, known_fut)
other = FakeCustomResource("some-other-resource")
other_fut = asyncio.Future()
other_fut.set_result(99)
other_known_fut = asyncio.Future()
other_known_fut.set_result(False) # <- not known
other_out = Output({other}, other_fut, other_known_fut)
combined = Output.all(out, other_out)
deps = []
prop = await rpc.serialize_property(combined, deps)
self.assertSetEqual(set(deps), {res, other})
# The contents of the list are unknown if any of the Outputs used to
# create it were unknown.
self.assertEqual(rpc.UNKNOWN, prop)
@async_test
async def test_unknown_output(self):
res = FakeCustomResource("some-dependency")