Fixes for C# concurrency bugs in 7492 (#7529)

* Reproduce the issue in a failing test

* Fix

* Tentative fix

* Update sdk/dotnet/Pulumi/Deployment/TaskMonitoringHelper.cs

Co-authored-by: Justin Van Patten <jvp@justinvp.com>

* Update sdk/dotnet/Pulumi/Deployment/TaskMonitoringHelper.cs

Co-authored-by: Justin Van Patten <jvp@justinvp.com>

* Update sdk/dotnet/Pulumi/Deployment/TaskMonitoringHelper.cs

Co-authored-by: Justin Van Patten <jvp@justinvp.com>

* Update sdk/dotnet/Pulumi/Deployment/Deployment.Runner.cs

Co-authored-by: Justin Van Patten <jvp@justinvp.com>

* Do not allocate TaskCompletionSource when not needed

* Update sdk/dotnet/Pulumi/Deployment/Deployment.Runner.cs

Co-authored-by: Josh Studt <32800478+orionstudt@users.noreply.github.com>

* Fix warning

* Cache delegate

* Simplify with named tuples

* Test early exception termination

* Test logging

* Remove the smelly method of suppressing engine exceptions

* Update sdk/dotnet/Pulumi/Deployment/TaskMonitoringHelper.cs

Co-authored-by: Josh Studt <32800478+orionstudt@users.noreply.github.com>

* Fix typo; check in xml docs

* Try CI again

* Add CHANGELOG entry

* Dedup exceptions before reporting

* Lock access to _exceptions list

* Fix typos

* Version of HandleExceptionsAsync that accepts N exceptions

* Do not aggregate exceptions prematurely

* Rename private members

* Formatting

* Summary markers

* Short-circuit return

* Stylistic fixes

* Strengthen test

* Check that we have only 1 exception

* Remove defensive clause about AggregateException from the test

* Simplify TerminatesEarly test

* Remove EmptyStack

* Notes on the regression nature of the WorksUnderStress test

* Remove race condition repro as it is a poor repro, impossible to trigger from user code

* Brace style

Co-authored-by: Justin Van Patten <jvp@justinvp.com>
Co-authored-by: Josh Studt <32800478+orionstudt@users.noreply.github.com>
This commit is contained in:
Anton Tayanovskyy 2021-07-22 12:49:14 -04:00 committed by GitHub
parent 30311405ca
commit b0f51a6b2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 470 additions and 111 deletions

View file

@ -1,4 +1,3 @@
### Improvements
- [sdk/dotnet] - Fix async await warnings
@ -15,7 +14,11 @@
### Bug Fixes
- [sdk/go] - Fix target and replace options for the Automation API.
- [sdk/dotnet] - Fix for race conditions in .NET SDK that used to
manifest as a `KeyNotFoundException` from `WhileRunningAsync`
[#7529](https://github.com/pulumi/pulumi/pull/7529)
- [sdk/go] - Fix target and replace options for the Automation API
[#7426](https://github.com/pulumi/pulumi/pull/7426)
- [cli] - Don't escape special characters when printing JSON.

View file

@ -0,0 +1,147 @@
// Copyright 2016-2021, Pulumi Corporation
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Xunit;
using Pulumi.Testing;
using Pulumi.Tests.Mocks;
namespace Pulumi.Tests
{
public class DeploymentRunnerTests
{
[Fact]
public async Task TerminatesEarlyOnException()
{
var deployResult = await Deployment.TryTestAsync<TerminatesEarlyOnExceptionStack>(new EmptyMocks());
Assert.NotNull(deployResult.Exception);
Assert.IsType<RunException>(deployResult.Exception!);
Assert.Contains("Deliberate test error", deployResult.Exception!.Message);
var stack = (TerminatesEarlyOnExceptionStack)deployResult.Resources[0];
Assert.False(stack.SlowOutput.GetValueAsync().IsCompleted);
}
class TerminatesEarlyOnExceptionStack : Stack
{
[Output]
public Output<int> SlowOutput { get; private set; }
public TerminatesEarlyOnExceptionStack()
{
Output.Create(Task.FromException<int>(new Exception("Deliberate test error")));
SlowOutput = Output.Create(Task.Delay(1000).ContinueWith(_ => 1));
}
}
[Fact]
public async Task LogsTaskDescriptions()
{
var resources = await Deployment.TestAsync<LogsTaskDescriptionsStack>(new EmptyMocks());
var stack = (LogsTaskDescriptionsStack)resources[0];
var messages = await stack.Logs;
for (var i = 0; i < 2; i++)
{
Assert.Contains($"Debug 0 Registering task: task{i}", messages);
Assert.Contains($"Debug 0 Completed task: task{i}", messages);
}
}
class LogsTaskDescriptionsStack : Stack
{
public Task<IEnumerable<string>> Logs { get; private set; }
public LogsTaskDescriptionsStack()
{
var deployment = Pulumi.Deployment.Instance.Internal;
var logger = new InMemoryLogger();
var runner = new Deployment.Runner(deployment, logger);
for (var i = 0; i < 2; i++)
{
runner.RegisterTask($"task{i}", Task.Delay(100 + i));
}
this.Logs = runner.WhileRunningAsync().ContinueWith(_ => logger.Messages);
}
}
class InMemoryLogger : ILogger
{
private readonly object _lockObject = new object();
private readonly List<string> _messages = new List<string>();
public void Log<TState>(
LogLevel level,
EventId eventId,
TState state,
Exception exc,
Func<TState, Exception, string> formatter)
{
var msg = formatter(state, exc);
Write($"{level} {eventId} {msg}");
}
public IEnumerable<String> Messages
{
get
{
lock (_lockObject)
{
return _messages.ToArray();
}
}
}
public IDisposable BeginScope<TState>(TState state)
{
Write($"BeginScope state={state}");
return new Scope()
{
Close = () => {
Write($"EndScope state={state}");
}
};
}
public bool IsEnabled(LogLevel level)
{
return true;
}
private void Write(string message)
{
lock (_lockObject)
{
_messages.Add(message);
}
}
class Scope : IDisposable
{
public Action Close { get; set; } = () => {};
public void Dispose()
{
Close();
}
}
}
class EmptyMocks : IMocks
{
public Task<object> CallAsync(MockCallArgs args)
{
return Task.FromResult<object>(args);
}
public Task<(string? id, object state)> NewResourceAsync(MockResourceArgs args)
{
throw new Exception($"Unknown resource {args.Type}");
}
}
}
}

View file

@ -1,9 +1,11 @@
// Copyright 2016-2020, Pulumi Corporation
// Copyright 2016-2021, Pulumi Corporation
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
@ -13,7 +15,7 @@ namespace Pulumi
{
public partial class Deployment
{
private class Runner : IRunner
internal class Runner : IRunner
{
private readonly IDeploymentInternal _deployment;
private readonly ILogger _deploymentLogger;
@ -29,10 +31,24 @@ namespace Pulumi
/// continuously, asynchronously loop, waiting for these tasks to complete, and only
/// exiting once the set becomes empty.
/// </summary>
private readonly Dictionary<Task, List<string>> _inFlightTasks = new Dictionary<Task, List<string>>();
private readonly TaskMonitoringHelper _inFlightTasks = new TaskMonitoringHelper();
private readonly object _exceptionsLock = new object();
private readonly List<Exception> _exceptions = new List<Exception>();
public ImmutableList<Exception> SwallowedExceptions => this._exceptions.ToImmutableList();
private readonly ConcurrentDictionary<(int TaskId, string Desc),int> _descriptions =
new ConcurrentDictionary<(int TaskId, string Desc),int>();
public ImmutableList<Exception> SwallowedExceptions
{
get
{
lock (_exceptionsLock)
{
return _exceptions.ToImmutableList();
}
}
}
public Runner(IDeploymentInternal deployment, ILogger deploymentLogger)
{
@ -80,21 +96,26 @@ namespace Pulumi
public void RegisterTask(string description, Task task)
{
_deploymentLogger.LogDebug($"Registering task: {description}");
_inFlightTasks.AddTask(task);
lock (_inFlightTasks)
// Ensure completion message is logged at most once when the task finishes.
if (_deploymentLogger.IsEnabled(LogLevel.Debug))
{
// We may get several of the same tasks with different descriptions. That can
// happen when the runtime reuses cached tasks that it knows are value-identical
// (for example Task.CompletedTask). In that case, we just store all the
// descriptions. We'll print them all out as done once this task actually
// finishes.
if (!_inFlightTasks.TryGetValue(task, out var descriptions))
{
descriptions = new List<string>();
_inFlightTasks.Add(task, descriptions);
}
descriptions.Add(description);
var key = (TaskId: task.Id, Desc: description);
int timesSeen = _descriptions.AddOrUpdate(key, _ => 1, (_, v) => v + 1);
if (timesSeen == 1)
{
task.ContinueWith(task => {
_deploymentLogger.LogDebug($"Completed task: {description}");
_descriptions.TryRemove(key, out _);
});
}
}
}
@ -105,86 +126,12 @@ namespace Pulumi
// 32 was picked so as to be very unlikely to collide with any other error codes.
private const int _processExitedAfterLoggingUserActionableMessage = 32;
private async Task<int> WhileRunningAsync()
internal async Task<int> WhileRunningAsync()
{
var tasks = new List<Task>();
// Keep looping as long as there are outstanding tasks that are still running.
while (true)
var errs = await _inFlightTasks.AwaitIdleOrFirstExceptionAsync().ConfigureAwait(false);
if (errs.Any())
{
tasks.Clear();
lock (_inFlightTasks)
{
if (_inFlightTasks.Count == 0)
{
// No more tasks in flight: exit the loop.
break;
}
// Grab all the tasks we currently have running.
tasks.AddRange(_inFlightTasks.Keys);
}
// Wait for one of the two events to happen:
// 1. All tasks in the list complete successfully, or
// 2. Any task throws an exception.
// There's no standard API with this semantics, so we create a custom completion source that is
// completed when remaining count is zero, or when an exception is thrown.
var remaining = tasks.Count;
var tcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
tasks.ForEach(HandleCompletion);
async void HandleCompletion(Task task)
{
try
{
// Wait for the task completion.
await task.ConfigureAwait(false);
// Log the descriptions of completed tasks.
List<string> descriptions;
lock (_inFlightTasks)
{
descriptions = _inFlightTasks[task];
}
foreach (var description in descriptions)
{
_deploymentLogger.LogDebug($"Completed task: {description}");
}
// Check if all the tasks are completed and signal the completion source if so.
if (Interlocked.Decrement(ref remaining) == 0)
{
tcs.TrySetResult(0);
}
}
catch (OperationCanceledException)
{
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
finally
{
// Once finished, remove the task from the set of tasks that are running.
lock (_inFlightTasks)
{
_inFlightTasks.Remove(task);
}
}
}
try
{
// Now actually await that combined task and realize any exceptions it may have thrown.
await tcs.Task.ConfigureAwait(false);
}
catch (Exception e)
{
// if it threw, report it as necessary, then quit.
return await HandleExceptionAsync(e).ConfigureAwait(false);
}
return await HandleExceptionsAsync(errs).ConfigureAwait(false);
}
// there were no more tasks we were waiting on. Quit out, reporting if we had any
@ -192,26 +139,56 @@ namespace Pulumi
return _deployment.Logger.LoggedErrors ? 1 : 0;
}
private async Task<int> HandleExceptionAsync(Exception exception)
private Task<int> HandleExceptionAsync(Exception exception)
{
this._exceptions.Add(exception);
return HandleExceptionsAsync(new Exception[]{exception});
}
if (exception is LogException)
private async Task<int> HandleExceptionsAsync(IEnumerable<Exception> exceptions)
{
if (!exceptions.Any())
{
// We got an error while logging itself. Nothing to do here but print some errors
// and fail entirely.
_deploymentLogger.LogError(exception, "Error occurred trying to send logging message to engine");
await Console.Error.WriteLineAsync($"Error occurred trying to send logging message to engine:\n{exception.ToStringDemystified()}").ConfigureAwait(false);
return 1;
return 0;
}
lock (_exceptionsLock)
{
_exceptions.AddRange(exceptions);
}
// For the rest of the issue we encounter log the problem to the error stream. if we
// successfully do this, then return with a special error code stating as such so that
// our host doesn't print out another set of errors.
var loggedExceptionCount = 0;
foreach (var exception in exceptions)
{
var logged = await LogExceptionToErrorStream(exception);
loggedExceptionCount += logged ? 1 : 0;
}
// If we logged any exceptions, then return with a
// special error code stating as such so that our host
// does not print out another set of errors.
return (loggedExceptionCount > 0)
? _processExitedAfterLoggingUserActionableMessage
: 1;
}
private async Task<bool> LogExceptionToErrorStream(Exception exception)
{
if (exception is LogException)
{
// We got an error while logging itself. Nothing
// to do here but print some errors and abort.
_deploymentLogger.LogError(exception, "Error occurred trying to send logging message to engine");
await Console.Error.WriteLineAsync($"Error occurred trying to send logging message to engine:\n{exception.ToStringDemystified()}").ConfigureAwait(false);
return false;
}
// For all other issues we encounter we log the
// problem to the error stream.
//
// Note: if these logging calls fail, they will just end up bubbling up an exception
// that will be caught by nothing. This will tear down the actual process with a
// non-zero error which our host will handle properly.
// Note: if these logging calls fail, they will just
// end up bubbling up an exception that will be caught
// by nothing. This will tear down the actual process
// with a non-zero error which our host will handle
// properly.
if (exception is RunException)
{
// Always hide the stack for RunErrors.
@ -229,7 +206,7 @@ namespace Pulumi
}
_deploymentLogger.LogDebug("Returning from program after last error");
return _processExitedAfterLoggingUserActionableMessage;
return true;
}
}
}

View file

@ -29,7 +29,7 @@ namespace Pulumi
/// <returns>A dictionary of stack outputs.</returns>
public static Task<int> RunAsync(Func<IDictionary<string, object?>> func)
=> RunAsync(() => Task.FromResult(func()));
/// <summary>
/// <see cref="RunAsync(Func{Task{IDictionary{string, object}}}, StackOptions)"/> for more details.
/// </summary>
@ -167,18 +167,41 @@ namespace Pulumi
=> TestAsync(mocks, runner => runner.RunAsync<TStack>(), options);
private static async Task<ImmutableArray<Resource>> TestAsync(IMocks mocks, Func<IRunner, Task<int>> runAsync, TestOptions? options = null)
{
var result = await TryTestAsync(mocks, runAsync, options);
if (result.Exception != null)
{
throw result.Exception;
}
return result.Resources;
}
/// <summary>
/// Like `TestAsync`, but instead of throwing the errors
/// detected in the engine, returns them in the result tuple.
/// This enables tests to observe partially constructed
/// `Resources` vector in presence of deliberate errors.
/// </summary>
internal static async Task<(ImmutableArray<Resource> Resources, Exception? Exception)> TryTestAsync(
IMocks mocks, Func<IRunner, Task<int>> runAsync, TestOptions? options = null)
{
var engine = new MockEngine();
var monitor = new MockMonitor(mocks);
await CreateRunnerAndRunAsync(() => new Deployment(engine, monitor, options), runAsync).ConfigureAwait(false);
return engine.Errors.Count switch
Exception? err = engine.Errors.Count switch
{
1 => throw new RunException(engine.Errors.Single()),
var v when v > 1 => throw new AggregateException(engine.Errors.Select(e => new RunException(e))),
_ => monitor.Resources.ToImmutableArray()
1 => new RunException(engine.Errors.Single()),
var v when v > 1 => new AggregateException(engine.Errors.Select(e => new RunException(e))),
_ => null
};
return (Resources: monitor.Resources.ToImmutableArray(), Exception: err);
}
internal static Task<(ImmutableArray<Resource> Resources, Exception? Exception)> TryTestAsync<TStack>(
IMocks mocks, TestOptions? options = null)
where TStack : Stack, new()
=> TryTestAsync(mocks, runner => runner.RunAsync<TStack>(), options);
// this method *must* remain marked async
// in order to protect the scope of the AsyncLocal Deployment.Instance we cannot elide the task (return it early)
// if the task is returned early and not awaited, than it is possible for any code that runs before the eventual await

View file

@ -0,0 +1,209 @@
// Copyright 2016-2021, Pulumi Corporation
using System;
using System.Linq;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
namespace Pulumi
{
/// <summary>
/// Monitors dynamically added tasks for completion. Enters IDLE
/// state when all monitored tasks finish. Allows awaiting next
/// IDLE state or an exception, whichever comes first.
/// Thread-safe.
/// </summary>
internal sealed class TaskMonitoringHelper
{
private readonly TaskExceptionTracker _exceptionTracker = new TaskExceptionTracker();
private readonly TaskIdleTracker _idleTracker = new TaskIdleTracker();
/// <summary>
/// Starts monitoring the given task.
/// </summary>
public void AddTask(Task task)
{
_exceptionTracker.AddTask(task);
_idleTracker.AddTask(task);
}
/// <summary>
/// Awaits next IDLE state or an exception, whichever comes
/// first. Several exceptions may be returned if they have
/// been observed prior to this call.
///
/// IDLE state is represented as an empty sequence in the result.
/// </summary>
public async Task<IEnumerable<Exception>> AwaitIdleOrFirstExceptionAsync()
{
var exceptionTask = _exceptionTracker.AwaitExceptionAsync();
var idleTask = _idleTracker.AwaitIdleAsync();
var firstTask = await Task.WhenAny((Task)exceptionTask, idleTask).ConfigureAwait(false);
if (firstTask == idleTask)
{
return Enumerable.Empty<Exception>();
}
return await exceptionTask;
}
}
/// <summary>
/// Monitors dynamically added tasks for completion, allows awaiting IDLE state.
/// </summary>
internal sealed class TaskIdleTracker
{
private readonly object _lockObject = new object();
private int _activeTasks;
private TaskCompletionSource<int>? _promise;
// Caches the delegate instance to avoid repeated allocations.
private readonly Action<Task> _onTaskCompleted;
public TaskIdleTracker()
{
_onTaskCompleted = OnTaskCompleted;
}
/// <summary>
/// Awaits next IDLE state when no monitored tasks are running.
/// </summary>
public Task AwaitIdleAsync()
{
lock (_lockObject)
{
if (_activeTasks == 0)
{
return Task.FromResult(0);
}
if (_promise == null)
{
_promise = new TaskCompletionSource<int>();
}
return _promise.Task;
}
}
/// <summary>
/// Monitors the given task.
/// </summary>
public void AddTask(Task task)
{
lock (_lockObject)
{
_activeTasks++;
}
task.ContinueWith(_onTaskCompleted);
}
private void OnTaskCompleted(Task task)
{
lock (_lockObject)
{
_activeTasks--;
if (_activeTasks == 0 && _promise != null)
{
_promise.SetResult(0);
_promise = null;
}
}
}
}
/// <summary>
/// Monitors dynamically added tasks for exceptions, allows awaiting exceptions.
/// </summary>
internal sealed class TaskExceptionTracker
{
private readonly object _lockObject = new object();
private readonly List<Exception> _exceptions = new List<Exception>();
private TaskCompletionSource<IEnumerable<Exception>>? _promise;
// Caches the delegate instance to avoid repeated allocations.
private readonly Action<Task> _onTaskCompleted;
public TaskExceptionTracker()
{
_onTaskCompleted = OnTaskCompleted;
}
/// <summary>
/// Monitors the given task.
/// </summary>
public void AddTask(Task task)
{
task.ContinueWith(_onTaskCompleted);
}
/// <summary>
/// Awaits the next set of `Exception` in the monitored tasks.
/// May never complete. Never returns an empty sequence.
/// </summary>
public Task<IEnumerable<Exception>> AwaitExceptionAsync()
{
lock (_lockObject)
{
if (_exceptions.Count > 0)
{
var err = Flush();
if (err != null)
{
return Task.FromResult(err);
}
}
if (_promise == null)
{
_promise = new TaskCompletionSource<IEnumerable<Exception>>();
}
return _promise.Task;
}
}
private IEnumerable<Exception> Flush()
{
// It is possible for multiple tasks to complete with the
// same exception. This is happening in the test suite. It
// is also possible to register the same task twice,
// causing duplication.
//
// The `Distinct` here ensures this class does not report
// the same exception twice to the single call of
// `AwaitExceptionsAsync`.
//
// Note it is still possible to observe the same
// exception twice from separate calls to
// `AwaitExceptionsAsync`. This class opts not to keep
// state to track that global invariant.
var errs = _exceptions.Distinct().ToImmutableArray();
_exceptions.Clear();
return errs;
}
private void OnTaskCompleted(Task task)
{
if (!task.IsFaulted)
{
return;
}
AggregateException? errs = task.Exception;
if (errs == null)
{
return;
}
lock (_lockObject)
{
_exceptions.AddRange(errs.InnerExceptions);
if (_promise != null)
{
var err = Flush();
if (err != null)
{
_promise.SetResult(err);
}
_promise = null;
}
}
}
}
}