diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8df0dbd8b..95c21fe40 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -4,3 +4,7 @@ - [cli] - Respect provider aliases [#7166](https://github.com/pulumi/pulumi/pull/7166) + +- [sdk/go] - Fix panics caused by logging from `ApplyT`, affecting + `pulumi-docker` and potentially other providers + [#7661](https://github.com/pulumi/pulumi/pull/7661) diff --git a/sdk/go/pulumi/context.go b/sdk/go/pulumi/context.go index c4ce1a9c6..81d6fb780 100644 --- a/sdk/go/pulumi/context.go +++ b/sdk/go/pulumi/context.go @@ -58,7 +58,7 @@ type Context struct { rpcsLock sync.Mutex // a lock protecting the RPC count and event. rpcError error // the first error (if any) encountered during an RPC. - join sync.WaitGroup // the waitgroup for non-RPC async work associated with this context + join workGroup // the waitgroup for non-RPC async work associated with this context Log Log // the logging interface for the Pulumi log stream. } diff --git a/sdk/go/pulumi/context_test.go b/sdk/go/pulumi/context_test.go new file mode 100644 index 000000000..88667c0c8 --- /dev/null +++ b/sdk/go/pulumi/context_test.go @@ -0,0 +1,120 @@ +// Copyright 2016-2021, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulumi + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// The test is extracted from a panic using pulumi-docker and minified +// while still reproducing the panic. The issue is that the resource +// constructor `NewImage` processes `StringInput` and logs into a +// captured `*pulumi.Context` from `ApplyT`. The user program passes a +// vanilla `String`. The `ApplyT` is not tracked against the context +// join group, but the logging is, which causes the logging statement +// to appear as "dynamic" work that appeared unexpectedly after "all +// work was done", and race with the program completion `Wait`. +// +// The test is was made to pass by using a custom-made `workGroup`. +func TestLoggingFromApplyCausesNoPanics(t *testing.T) { + // Usually panics on iteration 100-200 + for i := 0; i < 1000; i++ { + t.Logf("Iteration %d\n", i) + mocks := &testMonitor{} + err := RunErr(func(ctx *Context) error { + String("X").ToStringOutput().ApplyT(func(string) int { + err := ctx.Log.Debug("Zzz", &LogArgs{}) + assert.NoError(t, err) + return 0 + }) + return nil + }, WithMocks("project", "stack", mocks)) + assert.NoError(t, err) + } +} + +// An extended version of `TestLoggingFromApplyCausesNoPanics`, more +// realistically demonstrating the original usage pattern. +func TestLoggingFromResourceApplyCausesNoPanics(t *testing.T) { + // Usually panics on iteration 100-200 + for i := 0; i < 1000; i++ { + t.Logf("Iteration %d\n", i) + mocks := &testMonitor{} + err := RunErr(func(ctx *Context) error { + _, err := NewLoggingTestResource(t, ctx, "res", String("A")) + assert.NoError(t, err) + return nil + }, WithMocks("project", "stack", mocks)) + assert.NoError(t, err) + } +} + +type LoggingTestResource struct { + ResourceState + TestOutput StringOutput +} + +func NewLoggingTestResource( + t *testing.T, + ctx *Context, + name string, + input StringInput, + opts ...ResourceOption) (*LoggingTestResource, error) { + + resource := &LoggingTestResource{} + err := ctx.RegisterComponentResource("test:go:NewLoggingTestResource", name, resource, opts...) + if err != nil { + return nil, err + } + + resource.TestOutput = input.ToStringOutput().ApplyT(func(inputValue string) (string, error) { + time.Sleep(10) + err := ctx.Log.Debug("Zzz", &LogArgs{}) + assert.NoError(t, err) + return inputValue, nil + }).(StringOutput) + + outputs := Map(map[string]Input{ + "testOutput": resource.TestOutput, + }) + + err = ctx.RegisterResourceOutputs(resource, outputs) + if err != nil { + return nil, err + } + + return resource, nil +} + +// A contrived test demonstrating queueing work dynamically (`ApplyT` +// called from a separate goroutine). This used to cause a panic but +// is now resolved by using `workGroup`. +func TestWaitingCausesNoPanics(t *testing.T) { + for i := 0; i < 10; i++ { + mocks := &testMonitor{} + err := RunErr(func(ctx *Context) error { + o, set, _ := ctx.NewOutput() + go func() { + set(1) + o.ApplyT(func(x interface{}) interface{} { return x }) + }() + return nil + }, WithMocks("project", "stack", mocks)) + assert.NoError(t, err) + } +} diff --git a/sdk/go/pulumi/log.go b/sdk/go/pulumi/log.go index b1ede79b3..08d322bbc 100644 --- a/sdk/go/pulumi/log.go +++ b/sdk/go/pulumi/log.go @@ -18,7 +18,6 @@ package pulumi import ( "strings" - "sync" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" "golang.org/x/net/context" @@ -37,7 +36,7 @@ type Log interface { type logState struct { engine pulumirpc.EngineClient ctx context.Context - join *sync.WaitGroup + join *workGroup } // LogArgs may be used to specify arguments to be used for logging. diff --git a/sdk/go/pulumi/types.go b/sdk/go/pulumi/types.go index 369b2f69f..1f27ba14e 100644 --- a/sdk/go/pulumi/types.go +++ b/sdk/go/pulumi/types.go @@ -53,15 +53,15 @@ func RegisterOutputType(output Output) { } } -type waitGroups []*sync.WaitGroup +type workGroups []*workGroup -func (wgs waitGroups) add() { +func (wgs workGroups) add() { for _, g := range wgs { g.Add(1) } } -func (wgs waitGroups) done() { +func (wgs workGroups) done() { for _, g := range wgs { g.Done() } @@ -78,7 +78,7 @@ type OutputState struct { mutex sync.Mutex cond *sync.Cond - join *sync.WaitGroup // the wait group associated with this output, if any. + join *workGroup // the wait group associated with this output, if any. state uint32 // one of output{Pending,Resolved,Rejected} @@ -238,7 +238,7 @@ func (o *OutputState) getState() *OutputState { return o } -func newOutputState(join *sync.WaitGroup, elementType reflect.Type, deps ...Resource) *OutputState { +func newOutputState(join *workGroup, elementType reflect.Type, deps ...Resource) *OutputState { if join != nil { join.Add(1) } @@ -255,7 +255,7 @@ func newOutputState(join *sync.WaitGroup, elementType reflect.Type, deps ...Reso var outputStateType = reflect.TypeOf((*OutputState)(nil)) var outputTypeToOutputState sync.Map // map[reflect.Type]int -func newOutput(wg *sync.WaitGroup, typ reflect.Type, deps ...Resource) Output { +func newOutput(wg *workGroup, typ reflect.Type, deps ...Resource) Output { contract.Assert(typ.Implements(outputType)) // All values that implement Output must embed a field of type `*OutputState` by virtue of the unexported @@ -283,7 +283,7 @@ func newOutput(wg *sync.WaitGroup, typ reflect.Type, deps ...Resource) Output { return output.Interface().(Output) } -func newAnyOutput(wg *sync.WaitGroup) (Output, func(interface{}), func(error)) { +func newAnyOutput(wg *workGroup) (Output, func(interface{}), func(error)) { out := newOutputState(wg, anyType) resolve := func(v interface{}) { @@ -507,18 +507,18 @@ func AllWithContext(ctx context.Context, inputs ...interface{}) ArrayOutput { return ToOutputWithContext(ctx, inputs).(ArrayOutput) } -func gatherDependencies(v interface{}) ([]Resource, waitGroups) { +func gatherDependencies(v interface{}) ([]Resource, workGroups) { if v == nil { return nil, nil } depSet := make(map[Resource]struct{}) - joinSet := make(map[*sync.WaitGroup]struct{}) + joinSet := make(map[*workGroup]struct{}) gatherDependencySet(reflect.ValueOf(v), depSet, joinSet) - var joins waitGroups + var joins workGroups if len(joinSet) > 0 { - joins = make([]*sync.WaitGroup, 0, len(joinSet)) + joins = make([]*workGroup, 0, len(joinSet)) for j := range joinSet { joins = append(joins, j) } @@ -537,7 +537,7 @@ func gatherDependencies(v interface{}) ([]Resource, waitGroups) { var resourceType = reflect.TypeOf((*Resource)(nil)).Elem() -func gatherDependencySet(v reflect.Value, deps map[Resource]struct{}, joins map[*sync.WaitGroup]struct{}) { +func gatherDependencySet(v reflect.Value, deps map[Resource]struct{}, joins map[*workGroup]struct{}) { for { // Check for an Output that we can pull dependencies off of. if v.Type().Implements(outputType) && v.CanInterface() { @@ -796,7 +796,7 @@ func awaitInputs(ctx context.Context, v, resolved reflect.Value) (bool, bool, [] return known, secret, deps, err } -func toOutputTWithContext(ctx context.Context, join *sync.WaitGroup, outputType reflect.Type, v interface{}, result reflect.Value, forceSecretVal *bool) Output { +func toOutputTWithContext(ctx context.Context, join *workGroup, outputType reflect.Type, v interface{}, result reflect.Value, forceSecretVal *bool) Output { deps, joins := gatherDependencies(v) done := joins.done @@ -807,7 +807,7 @@ func toOutputTWithContext(ctx context.Context, join *sync.WaitGroup, outputType case 1: join, joins, done = joins[0], nil, func() {} default: - join = &sync.WaitGroup{} + join = &workGroup{} done = func() { join.Wait() joins.done() @@ -849,7 +849,7 @@ func ToOutputWithContext(ctx context.Context, v interface{}) Output { return toOutputWithContext(ctx, nil, v, nil) } -func toOutputWithContext(ctx context.Context, join *sync.WaitGroup, v interface{}, forceSecretVal *bool) Output { +func toOutputWithContext(ctx context.Context, join *workGroup, v interface{}, forceSecretVal *bool) Output { resultType := reflect.TypeOf(v) if input, ok := v.(Input); ok { resultType = input.ElementType() @@ -941,7 +941,7 @@ func AnyWithContext(ctx context.Context, v interface{}) AnyOutput { return anyWithContext(ctx, nil, v) } -func anyWithContext(ctx context.Context, join *sync.WaitGroup, v interface{}) AnyOutput { +func anyWithContext(ctx context.Context, join *workGroup, v interface{}) AnyOutput { var result interface{} return toOutputTWithContext(ctx, join, anyOutputType, v, reflect.ValueOf(&result).Elem(), nil).(AnyOutput) } diff --git a/sdk/go/pulumi/types_test.go b/sdk/go/pulumi/types_test.go index fff5348f8..12436595a 100644 --- a/sdk/go/pulumi/types_test.go +++ b/sdk/go/pulumi/types_test.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "reflect" - "sync" "testing" "github.com/stretchr/testify/assert" @@ -599,7 +598,7 @@ func TestDeps(t *testing.T) { } func testMixedWaitGroups(t *testing.T, combine func(o1, o2 Output) Output) { - var wg1, wg2 sync.WaitGroup + var wg1, wg2 workGroup o1 := newOutput(&wg1, anyOutputType) o2 := newOutput(&wg2, anyOutputType) diff --git a/sdk/go/pulumi/workgroup.go b/sdk/go/pulumi/workgroup.go new file mode 100644 index 000000000..33bba7e1e --- /dev/null +++ b/sdk/go/pulumi/workgroup.go @@ -0,0 +1,71 @@ +// Copyright 2016-2021, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulumi + +import ( + "fmt" + "sync" +) + +// Mimicks the interface of `sync.WaitGroup` but does not panic in +// case of races between `Wait` and `Add` with a positive delta in the +// state with a zero counter. The reason `sync.WaitGroup` panics is to +// warn about a race condition. Using `workGroup` implicitly accept +// these race conditions instead. Use sparingly and document why it is +// used. +type workGroup struct { + mutex sync.Mutex + cond *sync.Cond + counter int +} + +func (wg *workGroup) Wait() { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + if wg.cond == nil { + wg.cond = sync.NewCond(&wg.mutex) + } + + for wg.counter > 0 { + wg.cond.Wait() + } +} + +func (wg *workGroup) Add(delta int) { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + if wg.cond == nil { + wg.cond = sync.NewCond(&wg.mutex) + } + + c := wg.counter + delta + + if c < 0 { + panic(fmt.Sprintf("Adding %d would make workGroup counter negative: %d + %d = %d", + delta, wg.counter, delta, c)) + } + + wg.counter = c + + if c == 0 { + wg.cond.Broadcast() + } +} + +func (wg *workGroup) Done() { + wg.Add(-1) +} diff --git a/sdk/go/pulumi/workgroup_test.go b/sdk/go/pulumi/workgroup_test.go new file mode 100644 index 000000000..a3c6e5b16 --- /dev/null +++ b/sdk/go/pulumi/workgroup_test.go @@ -0,0 +1,53 @@ +// Copyright 2016-2021, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulumi + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWorkGroupActsAsWaitGroup(t *testing.T) { + check := func(j int) func(*testing.T) { + return func(*testing.T) { + var n int32 = 0 + wg := &workGroup{} + wg.Add(j) + + for k := 0; k < j; k++ { + go func() { + time.Sleep(10 * time.Millisecond) + atomic.AddInt32(&n, 1) + wg.Done() + }() + } + + wg.Wait() + assert.Equal(t, int32(j), atomic.AddInt32(&n, 0)) + } + } + + t.Run("j=1", check(1)) + t.Run("j=2", check(2)) + t.Run("j=3", check(3)) + t.Run("j=4", check(4)) + + // test Wait does not block on empty + wg := &workGroup{} + wg.Wait() +}