Fixing WaitGroup related panics in the Go SDK (#7661)

* Add tests reproducing the panics

* Add v2 of the test

* Use tailor-made workGroup to mimic sync.WaitGroup

* Lint

* CHANGELOG
This commit is contained in:
Anton Tayanovskyy 2021-07-29 12:39:28 -04:00 committed by GitHub
parent 1409d88438
commit 5788befcf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 267 additions and 21 deletions

View file

@ -4,3 +4,7 @@
- [cli] - Respect provider aliases
[#7166](https://github.com/pulumi/pulumi/pull/7166)
- [sdk/go] - Fix panics caused by logging from `ApplyT`, affecting
`pulumi-docker` and potentially other providers
[#7661](https://github.com/pulumi/pulumi/pull/7661)

View file

@ -58,7 +58,7 @@ type Context struct {
rpcsLock sync.Mutex // a lock protecting the RPC count and event.
rpcError error // the first error (if any) encountered during an RPC.
join sync.WaitGroup // the waitgroup for non-RPC async work associated with this context
join workGroup // the waitgroup for non-RPC async work associated with this context
Log Log // the logging interface for the Pulumi log stream.
}

View file

@ -0,0 +1,120 @@
// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pulumi
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// The test is extracted from a panic using pulumi-docker and minified
// while still reproducing the panic. The issue is that the resource
// constructor `NewImage` processes `StringInput` and logs into a
// captured `*pulumi.Context` from `ApplyT`. The user program passes a
// vanilla `String`. The `ApplyT` is not tracked against the context
// join group, but the logging is, which causes the logging statement
// to appear as "dynamic" work that appeared unexpectedly after "all
// work was done", and race with the program completion `Wait`.
//
// The test is was made to pass by using a custom-made `workGroup`.
func TestLoggingFromApplyCausesNoPanics(t *testing.T) {
// Usually panics on iteration 100-200
for i := 0; i < 1000; i++ {
t.Logf("Iteration %d\n", i)
mocks := &testMonitor{}
err := RunErr(func(ctx *Context) error {
String("X").ToStringOutput().ApplyT(func(string) int {
err := ctx.Log.Debug("Zzz", &LogArgs{})
assert.NoError(t, err)
return 0
})
return nil
}, WithMocks("project", "stack", mocks))
assert.NoError(t, err)
}
}
// An extended version of `TestLoggingFromApplyCausesNoPanics`, more
// realistically demonstrating the original usage pattern.
func TestLoggingFromResourceApplyCausesNoPanics(t *testing.T) {
// Usually panics on iteration 100-200
for i := 0; i < 1000; i++ {
t.Logf("Iteration %d\n", i)
mocks := &testMonitor{}
err := RunErr(func(ctx *Context) error {
_, err := NewLoggingTestResource(t, ctx, "res", String("A"))
assert.NoError(t, err)
return nil
}, WithMocks("project", "stack", mocks))
assert.NoError(t, err)
}
}
type LoggingTestResource struct {
ResourceState
TestOutput StringOutput
}
func NewLoggingTestResource(
t *testing.T,
ctx *Context,
name string,
input StringInput,
opts ...ResourceOption) (*LoggingTestResource, error) {
resource := &LoggingTestResource{}
err := ctx.RegisterComponentResource("test:go:NewLoggingTestResource", name, resource, opts...)
if err != nil {
return nil, err
}
resource.TestOutput = input.ToStringOutput().ApplyT(func(inputValue string) (string, error) {
time.Sleep(10)
err := ctx.Log.Debug("Zzz", &LogArgs{})
assert.NoError(t, err)
return inputValue, nil
}).(StringOutput)
outputs := Map(map[string]Input{
"testOutput": resource.TestOutput,
})
err = ctx.RegisterResourceOutputs(resource, outputs)
if err != nil {
return nil, err
}
return resource, nil
}
// A contrived test demonstrating queueing work dynamically (`ApplyT`
// called from a separate goroutine). This used to cause a panic but
// is now resolved by using `workGroup`.
func TestWaitingCausesNoPanics(t *testing.T) {
for i := 0; i < 10; i++ {
mocks := &testMonitor{}
err := RunErr(func(ctx *Context) error {
o, set, _ := ctx.NewOutput()
go func() {
set(1)
o.ApplyT(func(x interface{}) interface{} { return x })
}()
return nil
}, WithMocks("project", "stack", mocks))
assert.NoError(t, err)
}
}

View file

@ -18,7 +18,6 @@ package pulumi
import (
"strings"
"sync"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
"golang.org/x/net/context"
@ -37,7 +36,7 @@ type Log interface {
type logState struct {
engine pulumirpc.EngineClient
ctx context.Context
join *sync.WaitGroup
join *workGroup
}
// LogArgs may be used to specify arguments to be used for logging.

View file

@ -53,15 +53,15 @@ func RegisterOutputType(output Output) {
}
}
type waitGroups []*sync.WaitGroup
type workGroups []*workGroup
func (wgs waitGroups) add() {
func (wgs workGroups) add() {
for _, g := range wgs {
g.Add(1)
}
}
func (wgs waitGroups) done() {
func (wgs workGroups) done() {
for _, g := range wgs {
g.Done()
}
@ -78,7 +78,7 @@ type OutputState struct {
mutex sync.Mutex
cond *sync.Cond
join *sync.WaitGroup // the wait group associated with this output, if any.
join *workGroup // the wait group associated with this output, if any.
state uint32 // one of output{Pending,Resolved,Rejected}
@ -238,7 +238,7 @@ func (o *OutputState) getState() *OutputState {
return o
}
func newOutputState(join *sync.WaitGroup, elementType reflect.Type, deps ...Resource) *OutputState {
func newOutputState(join *workGroup, elementType reflect.Type, deps ...Resource) *OutputState {
if join != nil {
join.Add(1)
}
@ -255,7 +255,7 @@ func newOutputState(join *sync.WaitGroup, elementType reflect.Type, deps ...Reso
var outputStateType = reflect.TypeOf((*OutputState)(nil))
var outputTypeToOutputState sync.Map // map[reflect.Type]int
func newOutput(wg *sync.WaitGroup, typ reflect.Type, deps ...Resource) Output {
func newOutput(wg *workGroup, typ reflect.Type, deps ...Resource) Output {
contract.Assert(typ.Implements(outputType))
// All values that implement Output must embed a field of type `*OutputState` by virtue of the unexported
@ -283,7 +283,7 @@ func newOutput(wg *sync.WaitGroup, typ reflect.Type, deps ...Resource) Output {
return output.Interface().(Output)
}
func newAnyOutput(wg *sync.WaitGroup) (Output, func(interface{}), func(error)) {
func newAnyOutput(wg *workGroup) (Output, func(interface{}), func(error)) {
out := newOutputState(wg, anyType)
resolve := func(v interface{}) {
@ -507,18 +507,18 @@ func AllWithContext(ctx context.Context, inputs ...interface{}) ArrayOutput {
return ToOutputWithContext(ctx, inputs).(ArrayOutput)
}
func gatherDependencies(v interface{}) ([]Resource, waitGroups) {
func gatherDependencies(v interface{}) ([]Resource, workGroups) {
if v == nil {
return nil, nil
}
depSet := make(map[Resource]struct{})
joinSet := make(map[*sync.WaitGroup]struct{})
joinSet := make(map[*workGroup]struct{})
gatherDependencySet(reflect.ValueOf(v), depSet, joinSet)
var joins waitGroups
var joins workGroups
if len(joinSet) > 0 {
joins = make([]*sync.WaitGroup, 0, len(joinSet))
joins = make([]*workGroup, 0, len(joinSet))
for j := range joinSet {
joins = append(joins, j)
}
@ -537,7 +537,7 @@ func gatherDependencies(v interface{}) ([]Resource, waitGroups) {
var resourceType = reflect.TypeOf((*Resource)(nil)).Elem()
func gatherDependencySet(v reflect.Value, deps map[Resource]struct{}, joins map[*sync.WaitGroup]struct{}) {
func gatherDependencySet(v reflect.Value, deps map[Resource]struct{}, joins map[*workGroup]struct{}) {
for {
// Check for an Output that we can pull dependencies off of.
if v.Type().Implements(outputType) && v.CanInterface() {
@ -796,7 +796,7 @@ func awaitInputs(ctx context.Context, v, resolved reflect.Value) (bool, bool, []
return known, secret, deps, err
}
func toOutputTWithContext(ctx context.Context, join *sync.WaitGroup, outputType reflect.Type, v interface{}, result reflect.Value, forceSecretVal *bool) Output {
func toOutputTWithContext(ctx context.Context, join *workGroup, outputType reflect.Type, v interface{}, result reflect.Value, forceSecretVal *bool) Output {
deps, joins := gatherDependencies(v)
done := joins.done
@ -807,7 +807,7 @@ func toOutputTWithContext(ctx context.Context, join *sync.WaitGroup, outputType
case 1:
join, joins, done = joins[0], nil, func() {}
default:
join = &sync.WaitGroup{}
join = &workGroup{}
done = func() {
join.Wait()
joins.done()
@ -849,7 +849,7 @@ func ToOutputWithContext(ctx context.Context, v interface{}) Output {
return toOutputWithContext(ctx, nil, v, nil)
}
func toOutputWithContext(ctx context.Context, join *sync.WaitGroup, v interface{}, forceSecretVal *bool) Output {
func toOutputWithContext(ctx context.Context, join *workGroup, v interface{}, forceSecretVal *bool) Output {
resultType := reflect.TypeOf(v)
if input, ok := v.(Input); ok {
resultType = input.ElementType()
@ -941,7 +941,7 @@ func AnyWithContext(ctx context.Context, v interface{}) AnyOutput {
return anyWithContext(ctx, nil, v)
}
func anyWithContext(ctx context.Context, join *sync.WaitGroup, v interface{}) AnyOutput {
func anyWithContext(ctx context.Context, join *workGroup, v interface{}) AnyOutput {
var result interface{}
return toOutputTWithContext(ctx, join, anyOutputType, v, reflect.ValueOf(&result).Elem(), nil).(AnyOutput)
}

View file

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -599,7 +598,7 @@ func TestDeps(t *testing.T) {
}
func testMixedWaitGroups(t *testing.T, combine func(o1, o2 Output) Output) {
var wg1, wg2 sync.WaitGroup
var wg1, wg2 workGroup
o1 := newOutput(&wg1, anyOutputType)
o2 := newOutput(&wg2, anyOutputType)

View file

@ -0,0 +1,71 @@
// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pulumi
import (
"fmt"
"sync"
)
// Mimicks the interface of `sync.WaitGroup` but does not panic in
// case of races between `Wait` and `Add` with a positive delta in the
// state with a zero counter. The reason `sync.WaitGroup` panics is to
// warn about a race condition. Using `workGroup` implicitly accept
// these race conditions instead. Use sparingly and document why it is
// used.
type workGroup struct {
mutex sync.Mutex
cond *sync.Cond
counter int
}
func (wg *workGroup) Wait() {
wg.mutex.Lock()
defer wg.mutex.Unlock()
if wg.cond == nil {
wg.cond = sync.NewCond(&wg.mutex)
}
for wg.counter > 0 {
wg.cond.Wait()
}
}
func (wg *workGroup) Add(delta int) {
wg.mutex.Lock()
defer wg.mutex.Unlock()
if wg.cond == nil {
wg.cond = sync.NewCond(&wg.mutex)
}
c := wg.counter + delta
if c < 0 {
panic(fmt.Sprintf("Adding %d would make workGroup counter negative: %d + %d = %d",
delta, wg.counter, delta, c))
}
wg.counter = c
if c == 0 {
wg.cond.Broadcast()
}
}
func (wg *workGroup) Done() {
wg.Add(-1)
}

View file

@ -0,0 +1,53 @@
// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pulumi
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestWorkGroupActsAsWaitGroup(t *testing.T) {
check := func(j int) func(*testing.T) {
return func(*testing.T) {
var n int32 = 0
wg := &workGroup{}
wg.Add(j)
for k := 0; k < j; k++ {
go func() {
time.Sleep(10 * time.Millisecond)
atomic.AddInt32(&n, 1)
wg.Done()
}()
}
wg.Wait()
assert.Equal(t, int32(j), atomic.AddInt32(&n, 0))
}
}
t.Run("j=1", check(1))
t.Run("j=2", check(2))
t.Run("j=3", check(3))
t.Run("j=4", check(4))
// test Wait does not block on empty
wg := &workGroup{}
wg.Wait()
}