pulumi/sdk/dotnet/Pulumi/Deployment/Deployment.Runner.cs
Josh Studt 963b5ab710
[Automation API] - C# Implementation (#5761)
* Init Workspace interface for C# Automation API

* fleshing out workspace interface and beginning of local workspace implementation

* initial run pulumi cmd implementation

* resolve issue with pulumi cmd cleanup wrapper task after testing

* flesh out local workspace implementation, flesh out stack implementation, cleanup run pulumi cmd implementation and make it an instance so it is mockable/testable, separate serialization in prep for custom converters

* project settings json serialization implemented

* Initial commit of language server

* Add deployment from language server

* Cleanup

* finish json serialization

* project runtime yaml serialization completed. just need stack config value yaml serialization

* Remove typed argument

* Limit concurrency

* Rename file for consistency

* final commit of a semi-working project settings & stack settings serialization so that it is in the commit history

* modify workspace API so that settings accessors aren't fully exposed since we are defering a complete serialization implementation until a later date

* yaml converters wrap any outgoing exceptions so resolve that

* getting the beginning of inline program GRPC communication set up

* stack lifecycle operations implemented, and switched to newtonsoft for JSON serialization

* change back to system.text.json with a custom object converter

* local workspace tests written, working on getting them passing

* fix the encoding on the GO files used for testing

* all tests passing except inline program, pulumi engine not available with inline

* inline program engine is now running as expecting, but inline program is not recognizing local stack config

* All tests passing, but no concurrency capability because of the singleton DeploymentInstance.

* cleanup unnecessary usings

* minor cleanup / changes after a quick review. Make sure ConfigureAwait is used where needed. Remove newtonsoft dependency from testing. Update workspace API to use existing PluginKind enum. Modify LanguageRuntimeService so that its semaphore operates process-wide.

* support for parallel execution of inline program, test included

* Update LocalWorkspaceTests.cs

remove some redundancy from the inline program parallel execution text

* flesh out some comments and make asynclocal instance readonly

* Strip out instance locking since it is no longer necessary with AsyncLocal wrapping the Deployment.Instance. Modify CreateRunner method such that we are ensuring there isn't a chance of delayed synchronous execution polluting the value of Deployment.Instance across calls to Deployment.RunAsync

* resolve conflicts with changes made to Deployment.TestAsync entrypoints

* update changelog

* write a test that fails if the CreateRunnerAndRunAsync method on Deployment is not marked async and fix test project data file ref

* make resource package state share the lifetime of the deployment so that their isn't cross deployment issues with resource packages, add support and tests for external resource packages (resource packages that aren't referenced by the executing assembly)

* enable parallel test collection execution in test suite, add some additional tests for deployment instance protection and ensuring that our first class stack exceptions are thrown when expected

* minor inline project name arg change, and re-add xunit json to build output (whoops)

* strip out concurrency changes since they are now in #6139, split automation into separate assembly, split automation tests into separate assembly

* add copyright to the top of each new file

* resolve some PR remarks

* inline program exception is now properly propagated to the caller on UpAsync and PreviewAsync

* modify PulumiFn to allow TStack and other delegate overloads without needing multiple first class delegates.

* whoops missing a copyright

* resolve getting TStack into IRunner so that outputs are registered correctly and so that there isn't 2 instances of Pulumi.Stack instantiated.

* resolve issue with propagation of TStack exceptions and add a test

* add support for a TStack PulumiFn resolved via IServiceProvider

* update automation API description

* fix comment and remove unnecessary TODOs

* disable packaging of automation api assembly

* re-name automation api documentation file appropriately

* add --limit support to dotnet automation api for stack history per #6257

* re-name XStack as WorkspaceStack

* replace --limit usage with --page-size and --page in dotnet automation api per #6292

Co-authored-by: evanboyle <evan@pulumi.com>
Co-authored-by: Josh Studt <josh.studt@figmarketing.com>
Co-authored-by: Dan Friedman <dan@thefriedmans.org>
Co-authored-by: David Ferretti <David.Ferretti@figmarketing.com>
Co-authored-by: Mikhail Shilkov <github@mikhail.io>
2021-02-18 11:36:21 +01:00

229 lines
10 KiB
C#

// Copyright 2016-2020, Pulumi Corporation
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Pulumi
{
public partial class Deployment
{
private class Runner : IRunner
{
private readonly IDeploymentInternal _deployment;
/// <summary>
/// The set of tasks that we have fired off. We issue tasks in a Fire-and-Forget manner
/// to be able to expose a Synchronous <see cref="Resource"/> model for users. i.e. a
/// user just synchronously creates a resource, and we asynchronously kick off the work
/// to populate it. This works well, however we have to make sure the console app
/// doesn't exit because it thinks there is no work to do.
/// <para/>
/// To ensure that doesn't happen, we have the main entrypoint of the app just
/// continuously, asynchronously loop, waiting for these tasks to complete, and only
/// exiting once the set becomes empty.
/// </summary>
private readonly Dictionary<Task, List<string>> _inFlightTasks = new Dictionary<Task, List<string>>();
public Runner(IDeploymentInternal deployment)
=> _deployment = deployment;
public Task<int> RunAsync<TStack>(IServiceProvider serviceProvider) where TStack : Stack
{
if (serviceProvider == null)
{
throw new ArgumentNullException(nameof(serviceProvider));
}
return RunAsync(() => serviceProvider.GetService(typeof(TStack)) as TStack
?? throw new ApplicationException($"Failed to resolve instance of type {typeof(TStack)} from service provider. Register the type with the service provider before calling {nameof(RunAsync)}."));
}
public Task<int> RunAsync<TStack>() where TStack : Stack, new()
=> RunAsync(() => new TStack());
public Task<int> RunAsync<TStack>(Func<TStack> stackFactory) where TStack : Stack
{
try
{
var stack = stackFactory();
// Stack doesn't call RegisterOutputs, so we register them on its behalf.
stack.RegisterPropertyOutputs();
RegisterTask("User program code.", stack.Outputs.DataTask);
}
catch (Exception ex)
{
return HandleExceptionAsync(ex);
}
return WhileRunningAsync();
}
public Task<int> RunAsync(Func<Task<IDictionary<string, object?>>> func, StackOptions? options)
{
var stack = new Stack(func, options);
RegisterTask("User program code.", stack.Outputs.DataTask);
return WhileRunningAsync();
}
public void RegisterTask(string description, Task task)
{
Serilog.Log.Information($"Registering task: {description}");
lock (_inFlightTasks)
{
// We may get several of the same tasks with different descriptions. That can
// happen when the runtime reuses cached tasks that it knows are value-identical
// (for example Task.CompletedTask). In that case, we just store all the
// descriptions. We'll print them all out as done once this task actually
// finishes.
if (!_inFlightTasks.TryGetValue(task, out var descriptions))
{
descriptions = new List<string>();
_inFlightTasks.Add(task, descriptions);
}
descriptions.Add(description);
}
}
// Keep track if we already logged the information about an unhandled error to the user. If
// so, we end with a different exit code. The language host recognizes this and will not print
// any further messages to the user since we already took care of it.
//
// 32 was picked so as to be very unlikely to collide with any other error codes.
private const int _processExitedAfterLoggingUserActionableMessage = 32;
private async Task<int> WhileRunningAsync()
{
var tasks = new List<Task>();
// Keep looping as long as there are outstanding tasks that are still running.
while (true)
{
tasks.Clear();
lock (_inFlightTasks)
{
if (_inFlightTasks.Count == 0)
{
// No more tasks in flight: exit the loop.
break;
}
// Grab all the tasks we currently have running.
tasks.AddRange(_inFlightTasks.Keys);
}
// Wait for one of the two events to happen:
// 1. All tasks in the list complete successfully, or
// 2. Any task throws an exception.
// There's no standard API with this semantics, so we create a custom completion source that is
// completed when remaining count is zero, or when an exception is thrown.
var remaining = tasks.Count;
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
tasks.ForEach(HandleCompletion);
async void HandleCompletion(Task task)
{
try
{
// Wait for the task completion.
await task.ConfigureAwait(false);
// Log the descriptions of completed tasks.
List<string> descriptions;
lock (_inFlightTasks)
{
descriptions = _inFlightTasks[task];
}
foreach (var description in descriptions)
{
Serilog.Log.Information($"Completed task: {description}");
}
// Check if all the tasks are completed and signal the completion source if so.
if (Interlocked.Decrement(ref remaining) == 0)
{
tcs.TrySetResult(0);
}
}
catch (OperationCanceledException)
{
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
finally
{
// Once finished, remove the task from the set of tasks that are running.
lock (_inFlightTasks)
{
_inFlightTasks.Remove(task);
}
}
}
try
{
// Now actually await that combined task and realize any exceptions it may have thrown.
await tcs.Task.ConfigureAwait(false);
}
catch (Exception e)
{
// if it threw, report it as necessary, then quit.
return await HandleExceptionAsync(e).ConfigureAwait(false);
}
}
// there were no more tasks we were waiting on. Quit out, reporting if we had any
// errors or not.
return _deployment.Logger.LoggedErrors ? 1 : 0;
}
private async Task<int> HandleExceptionAsync(Exception exception)
{
if (exception is LogException)
{
// We got an error while logging itself. Nothing to do here but print some errors
// and fail entirely.
Serilog.Log.Error(exception, "Error occurred trying to send logging message to engine.");
await Console.Error.WriteLineAsync("Error occurred trying to send logging message to engine:\n" + exception).ConfigureAwait(false);
return 1;
}
// For the rest of the issue we encounter log the problem to the error stream. if we
// successfully do this, then return with a special error code stating as such so that
// our host doesn't print out another set of errors.
//
// Note: if these logging calls fail, they will just end up bubbling up an exception
// that will be caught by nothing. This will tear down the actual process with a
// non-zero error which our host will handle properly.
if (exception is RunException)
{
// Always hide the stack for RunErrors.
await _deployment.Logger.ErrorAsync(exception.Message).ConfigureAwait(false);
}
else if (exception is ResourceException resourceEx)
{
var message = resourceEx.HideStack
? resourceEx.Message
: resourceEx.ToString();
await _deployment.Logger.ErrorAsync(message, resourceEx.Resource).ConfigureAwait(false);
}
else
{
var location = System.Reflection.Assembly.GetEntryAssembly()?.Location;
await _deployment.Logger.ErrorAsync(
$@"Running program '{location}' failed with an unhandled exception:
{exception.ToString()}").ConfigureAwait(false);
}
Serilog.Log.Debug("Wrote last error. Returning from program.");
return _processExitedAfterLoggingUserActionableMessage;
}
}
}
}