Rework registration.

Instead of using .ContinueWith all over the place, register a single
ContinueWith call when we begin resource registration. When it
completes, we then call a callback on each resource that uses a
TaskCompletionSource to complete a task that represent the output
property.

This reduces the number of continuations we'll have floating around
and makes the code a little easier to grok.
This commit is contained in:
Matt Ellis 2018-06-21 11:48:28 -07:00
parent 791ff7e941
commit 3a820ddead
5 changed files with 78 additions and 41 deletions

View file

@ -1,19 +1,30 @@
using Google.Protobuf.WellKnownTypes;
using Pulumirpc;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pulumi {
public class CustomResource : Resource {
public abstract class CustomResource : Resource {
protected Task<Pulumirpc.RegisterResourceResponse> m_registrationResponse;
public Task<string> Id { get; private set;}
public CustomResource(string type, string name, Dictionary<string, object> properties = null, ResourceOptions options = default(ResourceOptions)) {
m_registrationResponse = RegisterAsync(type, name, true, properties, options);
TaskCompletionSource<string> m_IdCompletionSoruce;
public CustomResource() {
m_IdCompletionSoruce = new TaskCompletionSource<string>();
Id = m_IdCompletionSoruce.Task;
}
Id = m_registrationResponse.ContinueWith((x) => {
return x.Result.Id;
});
protected override void OnResourceRegistrationCompete(Task<RegisterResourceResponse> resp) {
base.OnResourceRegistrationCompete(resp);
if (resp.IsCanceled) {
m_IdCompletionSoruce.SetCanceled();
} else if (resp.IsFaulted) {
m_IdCompletionSoruce.SetException(resp.Exception);
} else {
m_IdCompletionSoruce.SetResult(resp.Result.Id);
}
}
}
}

View file

@ -9,14 +9,27 @@ namespace Pulumi
public abstract class Resource
{
public Task<string> Urn { get; private set; }
private TaskCompletionSource<string> m_UrnCompletionSource;
public const string UnkownResourceId = "04da6b54-80e4-46f7-96ec-b56ff0331ba9";
protected Resource()
{
m_UrnCompletionSource = new TaskCompletionSource<string>();
Urn = m_UrnCompletionSource.Task;
}
public Task<RegisterResourceResponse> RegisterAsync(string type, string name, bool custom, Dictionary<string, object> properties, ResourceOptions options) {
protected virtual void OnResourceRegistrationCompete(Task<RegisterResourceResponse> resp) {
if (resp.IsCanceled) {
m_UrnCompletionSource.SetCanceled();
} else if (resp.IsFaulted) {
m_UrnCompletionSource.SetException(resp.Exception);
} else {
m_UrnCompletionSource.SetResult(resp.Result.Urn);
}
}
public async void RegisterAsync(string type, string name, bool custom, Dictionary<string, object> properties, ResourceOptions options) {
Serilog.Log.Debug("RegisterAsync({type}, {name})", type, name);
if (string.IsNullOrEmpty(type))
@ -29,7 +42,6 @@ namespace Pulumi
throw new ArgumentException(nameof(name));
}
// Figure out the parent URN. If an explicit parent was passed in, use that. Otherwise use the global root URN. In the case where that hasn't been set yet, we must be creating
// the ComponentResource that represents the global stack object, so pass along no parent.
Task<string> parentUrn;
@ -41,23 +53,23 @@ namespace Pulumi
parentUrn = Task.FromResult("");
}
var res = Runtime.Monitor.RegisterResourceAsync(
// Kick off the registration, and when it completes, call the OnResourceRegistrationCompete method which will resolve all the tasks to their values. The fact that we don't
// await here is by design. This method is called by child classes in their constructors, where were do not want to block.
#pragma warning disable 4014
Runtime.Monitor.RegisterResourceAsync(
new RegisterResourceRequest()
{
Type = type,
Name = name,
Custom = custom,
Protect = false,
Object = SerializeProperties(properties),
Object = await SerializeProperties(properties),
Parent = parentUrn.Result
}
);
Urn = res.ResponseAsync.ContinueWith((x) => x.Result.Urn);
return res.ResponseAsync;
}).ResponseAsync.ContinueWith((x) => OnResourceRegistrationCompete(x));;
#pragma warning restore 4014
}
private Struct SerializeProperties(Dictionary<string, object> properties) {
private async Task<Struct> SerializeProperties(Dictionary<string, object> properties) {
if (properties == null) {
return new Struct();
}
@ -65,19 +77,19 @@ namespace Pulumi
var s = new Struct();
foreach (var kvp in properties) {
s.Fields.Add(kvp.Key, SerializeProperty(kvp.Value));
s.Fields.Add(kvp.Key, await SerializeProperty(kvp.Value));
}
return s;
}
private Value SerializeProperty(object o) {
private async Task<Value> SerializeProperty(object o) {
Serilog.Log.Debug("SerializeProperty({o})", o);
var input = o as IInput;
if (input != null) {
// Get the ground value.
var v = input.GetTask().Result;
var v = await input.GetTask();
if (v == null) {
return Value.ForNull();
@ -90,7 +102,7 @@ namespace Pulumi
// We marshal custom resources as strings of their provider generated IDs.
var cr = v as CustomResource;
if (cr != null) {
return Value.ForString(cr.Id.Result);
return Value.ForString(await cr.Id);
}
throw new NotImplementedException($"cannot marshal Input with underlying type ${input.GetType()}");

View file

@ -2,6 +2,7 @@
// the shape right "by hand" and then work on the code-gen to stub everything else out:
using Pulumi;
using Pulumirpc;
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
@ -11,19 +12,34 @@ namespace AWS.S3 {
// TODO(ellismg): These should be Output<T>.
public Task<string> BucketDomainName { get; private set; }
public Bucket(string name, BucketArgs args = default(BucketArgs), ResourceOptions opts = default(ResourceOptions)) :
base("aws:s3/bucket:Bucket", name, new Dictionary<string, object> {
{"acl", args.Acl},
}, opts)
private TaskCompletionSource<string> m_BucketDomainNameCompletionSource;
public Bucket(string name, BucketArgs args = default(BucketArgs), ResourceOptions opts = default(ResourceOptions))
{
BucketDomainName = m_registrationResponse.ContinueWith((x) => {
var fields = x.Result.Object.Fields;
m_BucketDomainNameCompletionSource = new TaskCompletionSource<string>();
BucketDomainName = m_BucketDomainNameCompletionSource.Task;
// TODO(ellismg): This is not correct, the result from the provider may not contain this key in the case
// where it's value is unknown, e.g. during an initial preview. When we do the Output<T> work here
// we'll need to have a better story.
return fields.ContainsKey("bucketDomainName") ? fields["bucketDomainName"].StringValue : null;
});
RegisterAsync("aws:s3/bucket:Bucket", name, true, new Dictionary<string, object> {
{"acl", args.Acl},
}, opts);
}
protected override void OnResourceRegistrationCompete(Task<RegisterResourceResponse> resp)
{
base.OnResourceRegistrationCompete(resp);
if (resp.IsCanceled) {
m_BucketDomainNameCompletionSource.SetCanceled();
} else if (resp.IsFaulted) {
m_BucketDomainNameCompletionSource.SetException(resp.Exception);
}
var fields = resp.Result.Object.Fields;
foreach (var kvp in fields) {
Serilog.Log.Debug("got property {key}", kvp.Key);
}
m_BucketDomainNameCompletionSource.SetResult(fields.ContainsKey("bucketDomainName") ? fields["bucketDomainName"].StringValue : null);
}
}
@ -33,16 +49,14 @@ namespace AWS.S3 {
}
public class BucketObject : CustomResource{
public BucketObject(string name, BucketObjectArgs args = default(BucketObjectArgs), ResourceOptions opts = default(ResourceOptions)) :
base("aws:s3/bucketObject:BucketObject", name, new Dictionary<string, object> {
public BucketObject(string name, BucketObjectArgs args = default(BucketObjectArgs), ResourceOptions opts = default(ResourceOptions)) {
RegisterAsync("aws:s3/bucketObject:BucketObject", name, true, new Dictionary<string, object> {
{"acl", args.Acl},
{"bucket", args.Bucket},
{"contentBase64", args.ContentBase64},
{"contentType", args.ContentType},
{"key", args.Key},
}, opts)
{
}, opts);
}
}

View file

@ -5,4 +5,4 @@ IFS="\n\t"
GOBIN=/opt/pulumi/bin go install ./cmd/pulumi-language-dotnet
export PATH=/opt/pulumi/bin:$PATH
cd examples/bucket
pulumi update --diff --yes
pulumi preview --diff

View file

@ -7,7 +7,7 @@ class Program
{
static void Main(string[] args)
{
Deployment.Run(async () => {
Deployment.Run(() => {
Config config = new Config("hello-dotnet");
// Create the bucket, and make it readable.
@ -32,9 +32,9 @@ class Program
//
// TODO(ellismg): We need to come up with a solution here. We probably want to track all the pending tasks generated
// by Pulumi during execution and await them to complete in the host itself...
Console.WriteLine($"Bucket ID id : {await bucket.Id}");
Console.WriteLine($"Content ID id : {await content.Id}");
Console.WriteLine($"https://{await bucket.BucketDomainName}/hello.txt");
Console.WriteLine($"Bucket ID id : {bucket.Id.Result}");
Console.WriteLine($"Content ID id : {content.Id.Result}");
Console.WriteLine($"https://{bucket.BucketDomainName.Result}/hello.txt");
});
}
}