pulumi/sdk/dotnet/Pulumi/Deployment/TaskMonitoringHelper.cs
Fraser Waters a199ba8bb5
Fix race condition in TaskMonitoringHelper (#8294)
* Fix race condition in TaskMonitoringHelper

Fixes #8163

TaskMonitoringHelper was using two seperate trackers for Idle and
FirstException and then calling WhenAny on both to see which state
happened first. This was racy as you could end up completing a task with
an exception but getting the idle tracker fire first, resulting in
TaskMonitoringHelper thinking no exception had happened.

I've combined the two trackers into TaskMonitoringHelper now. At each
task completion we check for exceptions and then idleness.

* Add changelog
2021-10-26 22:37:47 +01:00

122 lines
3.8 KiB
C#

// Copyright 2016-2021, Pulumi Corporation
using System;
using System.Linq;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
namespace Pulumi
{
/// <summary>
/// Monitors dynamically added tasks for completion. Enters IDLE
/// state when all monitored tasks finish. Allows awaiting next
/// IDLE state or an exception, whichever comes first.
/// Thread-safe.
/// </summary>
internal sealed class TaskMonitoringHelper
{
private readonly object _lockObject = new object();
private int _activeTasks;
private readonly List<Exception> _exceptions = new List<Exception>();
private TaskCompletionSource<IEnumerable<Exception>>? _promise;
// Caches the delegate instance to avoid repeated allocations.
private readonly Action<Task> _onTaskCompleted;
public TaskMonitoringHelper()
{
_onTaskCompleted = OnTaskCompleted;
}
/// <summary>
/// Starts monitoring the given task.
/// </summary>
public void AddTask(Task task)
{
lock (_lockObject)
{
_activeTasks++;
}
task.ContinueWith(_onTaskCompleted);
}
private IEnumerable<Exception> Flush()
{
// It is possible for multiple tasks to complete with the
// same exception. This is happening in the test suite. It
// is also possible to register the same task twice,
// causing duplication.
//
// The `Distinct` here ensures this class does not report
// the same exception twice to the single call of
// `AwaitExceptionsAsync`.
//
// Note it is still possible to observe the same
// exception twice from separate calls to
// `AwaitExceptionsAsync`. This class opts not to keep
// state to track that global invariant.
var errs = _exceptions.Distinct().ToImmutableArray();
_exceptions.Clear();
return errs;
}
private void OnTaskCompleted(Task task)
{
lock (_lockObject)
{
_activeTasks--;
if (task.IsFaulted && task.Exception != null)
{
_exceptions.AddRange(task.Exception.InnerExceptions);
}
if (_exceptions.Count > 0 && _promise != null)
{
_promise.SetResult(Flush());
_promise = null;
}
else if (_activeTasks == 0 && _promise != null)
{
_promise.SetResult(Enumerable.Empty<Exception>());
_promise = null;
}
}
}
/// <summary>
/// Awaits next IDLE state or an exception, whichever comes
/// first. Several exceptions may be returned if they have
/// been observed prior to this call.
///
/// IDLE state is represented as an empty sequence in the result.
/// </summary>
public Task<IEnumerable<Exception>> AwaitIdleOrFirstExceptionAsync()
{
lock (_lockObject)
{
if (_exceptions.Count > 0)
{
return Task.FromResult(Flush());
}
else if (_activeTasks == 0)
{
return Task.FromResult(Enumerable.Empty<Exception>());
}
else
{
if (_promise == null)
{
_promise = new TaskCompletionSource<IEnumerable<Exception>>();
}
return _promise.Task;
}
}
}
}
}