Merge pull request #764 from pulumi/763_wait_for_update_resiliency
Make the CLI's waitForUpdates more resilient to transient failure
This commit is contained in:
commit
86c1e7ad39
|
@ -3,6 +3,7 @@
|
||||||
package cloud
|
package cloud
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -31,6 +32,7 @@ import (
|
||||||
"github.com/pulumi/pulumi/pkg/util/archive"
|
"github.com/pulumi/pulumi/pkg/util/archive"
|
||||||
"github.com/pulumi/pulumi/pkg/util/cmdutil"
|
"github.com/pulumi/pulumi/pkg/util/cmdutil"
|
||||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||||
|
"github.com/pulumi/pulumi/pkg/util/retry"
|
||||||
"github.com/pulumi/pulumi/pkg/workspace"
|
"github.com/pulumi/pulumi/pkg/workspace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -450,24 +452,25 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error)
|
||||||
// Events occur in sequence, filter out all the ones we have seen before in each request.
|
// Events occur in sequence, filter out all the ones we have seen before in each request.
|
||||||
eventIndex := "0"
|
eventIndex := "0"
|
||||||
for {
|
for {
|
||||||
var updateResults apitype.UpdateResults
|
// Query for the latest update results, including log entries so we can provide active status updates.
|
||||||
pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex)
|
pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex)
|
||||||
if err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &updateResults); err != nil {
|
_, results, err := retry.Until(context.Background(), retry.Acceptor{
|
||||||
// If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep continuing.
|
Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||||
// TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together.
|
return b.tryNextUpdate(pathWithIndex, try, nextRetryTime)
|
||||||
if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == 504 {
|
},
|
||||||
time.Sleep(1 * time.Second)
|
})
|
||||||
continue
|
if err != nil {
|
||||||
}
|
|
||||||
return apitype.StatusFailed, err
|
return apitype.StatusFailed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We got a result, print it out.
|
||||||
|
updateResults := results.(apitype.UpdateResults)
|
||||||
for _, event := range updateResults.Events {
|
for _, event := range updateResults.Events {
|
||||||
printEvent(event)
|
printEvent(event)
|
||||||
eventIndex = event.Index
|
eventIndex = event.Index
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if in termal state.
|
// Check if in termal state and if so return.
|
||||||
updateStatus := apitype.UpdateStatus(updateResults.Status)
|
updateStatus := apitype.UpdateStatus(updateResults.Status)
|
||||||
switch updateStatus {
|
switch updateStatus {
|
||||||
case apitype.StatusFailed:
|
case apitype.StatusFailed:
|
||||||
|
@ -478,6 +481,56 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tryNextUpdate tries to get the next update for a Pulumi program. This may time or error out, which resutls in a
|
||||||
|
// false returned in the first return value. If a non-nil error is returned, this operation should fail.
|
||||||
|
func (b *cloudBackend) tryNextUpdate(pathWithIndex string,
|
||||||
|
try int, nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||||
|
// Perform the REST call.
|
||||||
|
var results apitype.UpdateResults
|
||||||
|
err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &results)
|
||||||
|
|
||||||
|
// If there is no error, we're done.
|
||||||
|
if err == nil {
|
||||||
|
return true, results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are three kinds of errors we might see:
|
||||||
|
// 1) Expected HTTP errors (like timeouts); silently retry.
|
||||||
|
// 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error.
|
||||||
|
// 3) Anything else; this could be any number of things, including transient errors (flaky network).
|
||||||
|
// In this case, we warn the user and keep retrying; they can ^C if it's not transient.
|
||||||
|
warn := true
|
||||||
|
if errResp, ok := err.(*apitype.ErrorResponse); ok {
|
||||||
|
if errResp.Code == 504 {
|
||||||
|
// If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep
|
||||||
|
// continuing. The sole exception is if we've done this 10 times. At that point, we will have
|
||||||
|
// been waiting for many seconds, and want to let the user know something might be wrong.
|
||||||
|
// TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together.
|
||||||
|
if try < 10 {
|
||||||
|
warn = false
|
||||||
|
}
|
||||||
|
glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v",
|
||||||
|
b.cloudURL, errResp.Code, try, err)
|
||||||
|
} else {
|
||||||
|
// Otherwise, we will issue an error.
|
||||||
|
glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v",
|
||||||
|
b.cloudURL, errResp.Code, try, err)
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(3).Infof("Unexpected %s error after %d retries (retrying): %v", b.cloudURL, try, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue a warning if appropriate.
|
||||||
|
if warn {
|
||||||
|
b.d.Warningf(diag.Message("error querying update status: %v"), err)
|
||||||
|
b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"),
|
||||||
|
nextRetryTime.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func printEvent(event apitype.UpdateEvent) {
|
func printEvent(event apitype.UpdateEvent) {
|
||||||
// Pluck out the string.
|
// Pluck out the string.
|
||||||
if raw, ok := event.Fields["text"]; ok && raw != nil {
|
if raw, ok := event.Fields["text"]; ok && raw != nil {
|
||||||
|
|
|
@ -9,24 +9,26 @@ import (
|
||||||
|
|
||||||
type Acceptor struct {
|
type Acceptor struct {
|
||||||
Accept Acceptance // a function that determines when to proceed.
|
Accept Acceptance // a function that determines when to proceed.
|
||||||
Progress Progress // an optional progress function.
|
|
||||||
Delay *time.Duration // an optional delay duration.
|
Delay *time.Duration // an optional delay duration.
|
||||||
Backoff *float64 // an optional backoff multiplier.
|
Backoff *float64 // an optional backoff multiplier.
|
||||||
|
MaxDelay *time.Duration // an optional maximum delay duration.
|
||||||
}
|
}
|
||||||
|
|
||||||
type Progress func(int) bool
|
// Acceptance is meant to accept a condition. It returns true when this condition has succeeded, and false otherwise
|
||||||
|
// (to which the retry framework responds by waiting and retrying after a certain period of time). If a non-nil error
|
||||||
|
// is returned, retrying halts. The interface{} data may be used to return final values to the caller.
|
||||||
|
type Acceptance func(try int, nextRetryTime time.Duration) (bool, interface{}, error)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultDelay time.Duration = 250 * time.Millisecond // by default, delay by 250ms
|
DefaultDelay time.Duration = 100 * time.Millisecond // by default, delay by 100ms
|
||||||
DefaultBackoff float64 = 2.0 // by default, backoff by 2.0
|
DefaultBackoff float64 = 1.5 // by default, backoff by 1.5x
|
||||||
|
DefaultMaxDelay time.Duration = 5 * time.Second // by default, no more than 5 seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
type Acceptance func() (bool, error)
|
|
||||||
|
|
||||||
// Until waits until the acceptor accepts the current condition, or the context expires, whichever comes first. A
|
// Until waits until the acceptor accepts the current condition, or the context expires, whichever comes first. A
|
||||||
// return boolean of true means the acceptor eventually accepted; a non-nil error means the acceptor returned an error.
|
// return boolean of true means the acceptor eventually accepted; a non-nil error means the acceptor returned an error.
|
||||||
// If an acceptor accepts a condition after the context has expired, we ignore the expiration and return the condition.
|
// If an acceptor accepts a condition after the context has expired, we ignore the expiration and return the condition.
|
||||||
func Until(ctx context.Context, acceptor Acceptor) (bool, error) {
|
func Until(ctx context.Context, acceptor Acceptor) (bool, interface{}, error) {
|
||||||
expired := false
|
expired := false
|
||||||
|
|
||||||
// Prepare our delay and backoff variables.
|
// Prepare our delay and backoff variables.
|
||||||
|
@ -42,44 +44,58 @@ func Until(ctx context.Context, acceptor Acceptor) (bool, error) {
|
||||||
} else {
|
} else {
|
||||||
backoff = *acceptor.Backoff
|
backoff = *acceptor.Backoff
|
||||||
}
|
}
|
||||||
|
var maxDelay time.Duration
|
||||||
// If the context expires before the waiter has accepted, return.
|
if acceptor.MaxDelay == nil {
|
||||||
go func() {
|
maxDelay = DefaultMaxDelay
|
||||||
<-ctx.Done()
|
} else {
|
||||||
expired = true
|
maxDelay = *acceptor.MaxDelay
|
||||||
}()
|
|
||||||
|
|
||||||
// Loop until the condition is accepted, or the context expires, whichever comes first.
|
|
||||||
tries := 1
|
|
||||||
for !expired {
|
|
||||||
if b, err := acceptor.Accept(); b || err != nil {
|
|
||||||
return b, err
|
|
||||||
}
|
|
||||||
if acceptor.Progress != nil && !acceptor.Progress(tries) {
|
|
||||||
break // progress function asked to quit.
|
|
||||||
}
|
|
||||||
time.Sleep(delay)
|
|
||||||
delay = time.Duration(float64(delay) * backoff)
|
|
||||||
tries++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
// If the context expires before the waiter has accepted, return.
|
||||||
|
if ctx != nil {
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
expired = true
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop until the condition is accepted, or the context expires, whichever comes first.
|
||||||
|
var try int
|
||||||
|
for !expired {
|
||||||
|
// Compute the next retry time so the callback can access it.
|
||||||
|
delay = time.Duration(float64(delay) * backoff)
|
||||||
|
if delay > maxDelay {
|
||||||
|
delay = maxDelay
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try the acceptance condition; if it returns true, or an error, we are done.
|
||||||
|
b, data, err := acceptor.Accept(try, delay)
|
||||||
|
if b || err != nil {
|
||||||
|
return b, data, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// About to try again. Sleep, bump the retry count, and go around the horn again.
|
||||||
|
time.Sleep(delay)
|
||||||
|
try++
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UntilDeadline creates a child context with the given deadline, and then invokes the above Until function.
|
// UntilDeadline creates a child context with the given deadline, and then invokes the above Until function.
|
||||||
func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, error) {
|
func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, interface{}, error) {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithDeadline(ctx, deadline)
|
ctx, cancel = context.WithDeadline(ctx, deadline)
|
||||||
b, err := Until(ctx, acceptor)
|
b, data, err := Until(ctx, acceptor)
|
||||||
cancel()
|
cancel()
|
||||||
return b, err
|
return b, data, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// UntilTimeout creates a child context with the given timeout, and then invokes the above Until function.
|
// UntilTimeout creates a child context with the given timeout, and then invokes the above Until function.
|
||||||
func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, error) {
|
func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, interface{}, error) {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||||
b, err := Until(ctx, acceptor)
|
b, data, err := Until(ctx, acceptor)
|
||||||
cancel()
|
cancel()
|
||||||
return b, err
|
return b, data, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue