* Fix race condition in TaskMonitoringHelper Fixes #8163 TaskMonitoringHelper was using two seperate trackers for Idle and FirstException and then calling WhenAny on both to see which state happened first. This was racy as you could end up completing a task with an exception but getting the idle tracker fire first, resulting in TaskMonitoringHelper thinking no exception had happened. I've combined the two trackers into TaskMonitoringHelper now. At each task completion we check for exceptions and then idleness. * Add changelog
122 lines
3.8 KiB
C#
122 lines
3.8 KiB
C#
// Copyright 2016-2021, Pulumi Corporation
|
|
|
|
using System;
|
|
using System.Linq;
|
|
using System.Collections.Generic;
|
|
using System.Collections.Immutable;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace Pulumi
|
|
{
|
|
/// <summary>
|
|
/// Monitors dynamically added tasks for completion. Enters IDLE
|
|
/// state when all monitored tasks finish. Allows awaiting next
|
|
/// IDLE state or an exception, whichever comes first.
|
|
/// Thread-safe.
|
|
/// </summary>
|
|
internal sealed class TaskMonitoringHelper
|
|
{
|
|
private readonly object _lockObject = new object();
|
|
private int _activeTasks;
|
|
|
|
private readonly List<Exception> _exceptions = new List<Exception>();
|
|
|
|
private TaskCompletionSource<IEnumerable<Exception>>? _promise;
|
|
|
|
// Caches the delegate instance to avoid repeated allocations.
|
|
private readonly Action<Task> _onTaskCompleted;
|
|
|
|
public TaskMonitoringHelper()
|
|
{
|
|
_onTaskCompleted = OnTaskCompleted;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Starts monitoring the given task.
|
|
/// </summary>
|
|
public void AddTask(Task task)
|
|
{
|
|
lock (_lockObject)
|
|
{
|
|
_activeTasks++;
|
|
}
|
|
task.ContinueWith(_onTaskCompleted);
|
|
}
|
|
|
|
private IEnumerable<Exception> Flush()
|
|
{
|
|
// It is possible for multiple tasks to complete with the
|
|
// same exception. This is happening in the test suite. It
|
|
// is also possible to register the same task twice,
|
|
// causing duplication.
|
|
//
|
|
// The `Distinct` here ensures this class does not report
|
|
// the same exception twice to the single call of
|
|
// `AwaitExceptionsAsync`.
|
|
//
|
|
// Note it is still possible to observe the same
|
|
// exception twice from separate calls to
|
|
// `AwaitExceptionsAsync`. This class opts not to keep
|
|
// state to track that global invariant.
|
|
var errs = _exceptions.Distinct().ToImmutableArray();
|
|
_exceptions.Clear();
|
|
return errs;
|
|
}
|
|
|
|
private void OnTaskCompleted(Task task)
|
|
{
|
|
lock (_lockObject)
|
|
{
|
|
_activeTasks--;
|
|
|
|
if (task.IsFaulted && task.Exception != null)
|
|
{
|
|
_exceptions.AddRange(task.Exception.InnerExceptions);
|
|
}
|
|
|
|
if (_exceptions.Count > 0 && _promise != null)
|
|
{
|
|
_promise.SetResult(Flush());
|
|
_promise = null;
|
|
}
|
|
else if (_activeTasks == 0 && _promise != null)
|
|
{
|
|
_promise.SetResult(Enumerable.Empty<Exception>());
|
|
_promise = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Awaits next IDLE state or an exception, whichever comes
|
|
/// first. Several exceptions may be returned if they have
|
|
/// been observed prior to this call.
|
|
///
|
|
/// IDLE state is represented as an empty sequence in the result.
|
|
/// </summary>
|
|
public Task<IEnumerable<Exception>> AwaitIdleOrFirstExceptionAsync()
|
|
{
|
|
lock (_lockObject)
|
|
{
|
|
if (_exceptions.Count > 0)
|
|
{
|
|
return Task.FromResult(Flush());
|
|
}
|
|
else if (_activeTasks == 0)
|
|
{
|
|
return Task.FromResult(Enumerable.Empty<Exception>());
|
|
}
|
|
else
|
|
{
|
|
if (_promise == null)
|
|
{
|
|
_promise = new TaskCompletionSource<IEnumerable<Exception>>();
|
|
}
|
|
return _promise.Task;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|