From db5318b0a5f8470f6d07dc09f12687f3de1e487d Mon Sep 17 00:00:00 2001 From: joeduffy Date: Sat, 23 Dec 2017 10:15:08 -0800 Subject: [PATCH 1/2] Make the CLI's waitForUpdates more resilient to transient failure We saw an issue where a user was mid-update, and got a networking error stating `read: operation timed out`. We believe this was simply a local client error, due to a flaky network. We should be resilient to such things during updates, particularly when there's no way to "reattach" to an in-progress udpate (see pulumi/pulumi#762). This change accomplishes this by changing our retry logic in the cloud backend's waitForUpdates function. Namely: * We recognize three types of failure, and react differently: - Expected HTTP errors. For instance, the 504 Gateway Timeouts that we already retried in the face of. In these cases, we will silently retry up to 10 times. After 10 times, we begin warning the user just in case this is a persistent condition. - Unexpected HTTP errors. The CLI will quit immediately and issue an error to the user, in the usual ways. This covers Unauthorized among other things. Over time, we may find that we want to intentionally move some HTTP errors into the above. - Anything else. This covers the transient networking errors case that we have just seen. I'll admit, it's a wide net, but any instance of this error issues a warning and it's up to the user to ^C out of it. We also log the error so that we'll see it if the user shares their logs with us. * We implement backoff logic so that we retry very quickly (100ms) on the first failure, and more slowly thereafter (1.5x, up to a max of 5 seconds). This helps to avoid accidentally DoSing our service. --- pkg/backend/cloud/backend.go | 70 +++++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 9 deletions(-) diff --git a/pkg/backend/cloud/backend.go b/pkg/backend/cloud/backend.go index 193105f5e..73bc9bd0f 100644 --- a/pkg/backend/cloud/backend.go +++ b/pkg/backend/cloud/backend.go @@ -444,30 +444,82 @@ func (b *cloudBackend) makeProgramUpdateRequest(stackName tokens.QName) (apitype }, nil } +const maxRetryTime = 5 * time.Second + // waitForUpdate waits for the current update of a Pulumi program to reach a terminal state. Returns the // final state. "path" is the URL endpoint to poll for updates. func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error) { // Events occur in sequence, filter out all the ones we have seen before in each request. eventIndex := "0" for { + // Query for the latest update results, including log entries so we can provide active status updates. + var retries int + var retryTime time.Duration var updateResults apitype.UpdateResults - pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex) - if err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &updateResults); err != nil { - // If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep continuing. - // TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together. - if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == 504 { - time.Sleep(1 * time.Second) - continue + for { + pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex) + err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &updateResults) + if err == nil { + break } - return apitype.StatusFailed, err + + // There are three kinds of errors we might see: + // 1) Expected HTTP errors (like timeouts); silently retry. + // 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error. + // 3) Anything else; this could be any number of things, including transient errors (flaky network). + // In this case, we warn the user and keep retrying; they can ^C if it's not transient. + warn := true + if errResp, ok := err.(*apitype.ErrorResponse); ok { + if errResp.Code == 504 { + // If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep + // continuing. The sole exception is if we've done this 10 times. At that point, we will have + // been waiting for many seconds, and want to let the user know something might be wrong. + // TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together. + if retries < 10 { + warn = false + } + glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v", + b.cloudURL, errResp.Code, retries, err) + } else { + // Otherwise, we will issue an error. + glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v", + b.cloudURL, errResp.Code, retries, err) + return apitype.StatusFailed, err + } + } else { + glog.V(3).Infof("Enexpected %s error after %d retries (retrying): %v", b.cloudURL, retries, err) + } + + // Compute a new retry time. Start at 100ms and increase by 1.5x from there, up to a maximum of 5 seconds. + // The increasing delay avoids accidcentally DoSing the service endpoint. + if retryTime == 0 { + retryTime = 100 * time.Millisecond + } else { + retryTime = time.Duration(int64(float64(retryTime.Nanoseconds()) * 1.5)) + if retryTime > maxRetryTime { + retryTime = maxRetryTime + } + } + + // Issue a warning if appropriate. + if warn { + b.d.Warningf(diag.Message("error querying update status: %v"), err) + b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"), + retryTime.Seconds()) + } + + // Now sleep and then go back around and try again... + retries++ + time.Sleep(retryTime) } + // We got a result, print it out. for _, event := range updateResults.Events { printEvent(event) eventIndex = event.Index } - // Check if in termal state. + // Check if in termal state and if so return. updateStatus := apitype.UpdateStatus(updateResults.Status) switch updateStatus { case apitype.StatusFailed: From 87079589f1cc8b861ae8c74cd511c9b88b387927 Mon Sep 17 00:00:00 2001 From: joeduffy Date: Tue, 26 Dec 2017 09:39:49 -0800 Subject: [PATCH 2/2] Use the retry framework for REST API retries This change incorporates feedback on https://github.com/pulumi/pulumi/pull/764, in addition to refactoring the retry logic to use our retry framework rather than hand-rolling it in the REST API code. It's a minor improvement, but at least lets us consolidate some of this logic which we'll undoubtedly use more of over time. --- pkg/backend/cloud/backend.go | 121 ++++++++++++++++++----------------- pkg/util/retry/until.go | 82 ++++++++++++++---------- 2 files changed, 110 insertions(+), 93 deletions(-) diff --git a/pkg/backend/cloud/backend.go b/pkg/backend/cloud/backend.go index 73bc9bd0f..2402e71b0 100644 --- a/pkg/backend/cloud/backend.go +++ b/pkg/backend/cloud/backend.go @@ -3,6 +3,7 @@ package cloud import ( + "context" "encoding/base64" "fmt" "io" @@ -31,6 +32,7 @@ import ( "github.com/pulumi/pulumi/pkg/util/archive" "github.com/pulumi/pulumi/pkg/util/cmdutil" "github.com/pulumi/pulumi/pkg/util/contract" + "github.com/pulumi/pulumi/pkg/util/retry" "github.com/pulumi/pulumi/pkg/workspace" ) @@ -444,8 +446,6 @@ func (b *cloudBackend) makeProgramUpdateRequest(stackName tokens.QName) (apitype }, nil } -const maxRetryTime = 5 * time.Second - // waitForUpdate waits for the current update of a Pulumi program to reach a terminal state. Returns the // final state. "path" is the URL endpoint to poll for updates. func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error) { @@ -453,67 +453,18 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error) eventIndex := "0" for { // Query for the latest update results, including log entries so we can provide active status updates. - var retries int - var retryTime time.Duration - var updateResults apitype.UpdateResults - for { - pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex) - err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &updateResults) - if err == nil { - break - } - - // There are three kinds of errors we might see: - // 1) Expected HTTP errors (like timeouts); silently retry. - // 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error. - // 3) Anything else; this could be any number of things, including transient errors (flaky network). - // In this case, we warn the user and keep retrying; they can ^C if it's not transient. - warn := true - if errResp, ok := err.(*apitype.ErrorResponse); ok { - if errResp.Code == 504 { - // If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep - // continuing. The sole exception is if we've done this 10 times. At that point, we will have - // been waiting for many seconds, and want to let the user know something might be wrong. - // TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together. - if retries < 10 { - warn = false - } - glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v", - b.cloudURL, errResp.Code, retries, err) - } else { - // Otherwise, we will issue an error. - glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v", - b.cloudURL, errResp.Code, retries, err) - return apitype.StatusFailed, err - } - } else { - glog.V(3).Infof("Enexpected %s error after %d retries (retrying): %v", b.cloudURL, retries, err) - } - - // Compute a new retry time. Start at 100ms and increase by 1.5x from there, up to a maximum of 5 seconds. - // The increasing delay avoids accidcentally DoSing the service endpoint. - if retryTime == 0 { - retryTime = 100 * time.Millisecond - } else { - retryTime = time.Duration(int64(float64(retryTime.Nanoseconds()) * 1.5)) - if retryTime > maxRetryTime { - retryTime = maxRetryTime - } - } - - // Issue a warning if appropriate. - if warn { - b.d.Warningf(diag.Message("error querying update status: %v"), err) - b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"), - retryTime.Seconds()) - } - - // Now sleep and then go back around and try again... - retries++ - time.Sleep(retryTime) + pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex) + _, results, err := retry.Until(context.Background(), retry.Acceptor{ + Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) { + return b.tryNextUpdate(pathWithIndex, try, nextRetryTime) + }, + }) + if err != nil { + return apitype.StatusFailed, err } // We got a result, print it out. + updateResults := results.(apitype.UpdateResults) for _, event := range updateResults.Events { printEvent(event) eventIndex = event.Index @@ -530,6 +481,56 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error) } } +// tryNextUpdate tries to get the next update for a Pulumi program. This may time or error out, which resutls in a +// false returned in the first return value. If a non-nil error is returned, this operation should fail. +func (b *cloudBackend) tryNextUpdate(pathWithIndex string, + try int, nextRetryTime time.Duration) (bool, interface{}, error) { + // Perform the REST call. + var results apitype.UpdateResults + err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &results) + + // If there is no error, we're done. + if err == nil { + return true, results, nil + } + + // There are three kinds of errors we might see: + // 1) Expected HTTP errors (like timeouts); silently retry. + // 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error. + // 3) Anything else; this could be any number of things, including transient errors (flaky network). + // In this case, we warn the user and keep retrying; they can ^C if it's not transient. + warn := true + if errResp, ok := err.(*apitype.ErrorResponse); ok { + if errResp.Code == 504 { + // If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep + // continuing. The sole exception is if we've done this 10 times. At that point, we will have + // been waiting for many seconds, and want to let the user know something might be wrong. + // TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together. + if try < 10 { + warn = false + } + glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v", + b.cloudURL, errResp.Code, try, err) + } else { + // Otherwise, we will issue an error. + glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v", + b.cloudURL, errResp.Code, try, err) + return false, nil, err + } + } else { + glog.V(3).Infof("Unexpected %s error after %d retries (retrying): %v", b.cloudURL, try, err) + } + + // Issue a warning if appropriate. + if warn { + b.d.Warningf(diag.Message("error querying update status: %v"), err) + b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"), + nextRetryTime.Seconds()) + } + + return false, nil, nil +} + func printEvent(event apitype.UpdateEvent) { // Pluck out the string. if raw, ok := event.Fields["text"]; ok && raw != nil { diff --git a/pkg/util/retry/until.go b/pkg/util/retry/until.go index 7e533cda7..9c630ecb7 100644 --- a/pkg/util/retry/until.go +++ b/pkg/util/retry/until.go @@ -9,24 +9,26 @@ import ( type Acceptor struct { Accept Acceptance // a function that determines when to proceed. - Progress Progress // an optional progress function. Delay *time.Duration // an optional delay duration. Backoff *float64 // an optional backoff multiplier. + MaxDelay *time.Duration // an optional maximum delay duration. } -type Progress func(int) bool +// Acceptance is meant to accept a condition. It returns true when this condition has succeeded, and false otherwise +// (to which the retry framework responds by waiting and retrying after a certain period of time). If a non-nil error +// is returned, retrying halts. The interface{} data may be used to return final values to the caller. +type Acceptance func(try int, nextRetryTime time.Duration) (bool, interface{}, error) const ( - DefaultDelay time.Duration = 250 * time.Millisecond // by default, delay by 250ms - DefaultBackoff float64 = 2.0 // by default, backoff by 2.0 + DefaultDelay time.Duration = 100 * time.Millisecond // by default, delay by 100ms + DefaultBackoff float64 = 1.5 // by default, backoff by 1.5x + DefaultMaxDelay time.Duration = 5 * time.Second // by default, no more than 5 seconds ) -type Acceptance func() (bool, error) - // Until waits until the acceptor accepts the current condition, or the context expires, whichever comes first. A // return boolean of true means the acceptor eventually accepted; a non-nil error means the acceptor returned an error. // If an acceptor accepts a condition after the context has expired, we ignore the expiration and return the condition. -func Until(ctx context.Context, acceptor Acceptor) (bool, error) { +func Until(ctx context.Context, acceptor Acceptor) (bool, interface{}, error) { expired := false // Prepare our delay and backoff variables. @@ -42,44 +44,58 @@ func Until(ctx context.Context, acceptor Acceptor) (bool, error) { } else { backoff = *acceptor.Backoff } - - // If the context expires before the waiter has accepted, return. - go func() { - <-ctx.Done() - expired = true - }() - - // Loop until the condition is accepted, or the context expires, whichever comes first. - tries := 1 - for !expired { - if b, err := acceptor.Accept(); b || err != nil { - return b, err - } - if acceptor.Progress != nil && !acceptor.Progress(tries) { - break // progress function asked to quit. - } - time.Sleep(delay) - delay = time.Duration(float64(delay) * backoff) - tries++ + var maxDelay time.Duration + if acceptor.MaxDelay == nil { + maxDelay = DefaultMaxDelay + } else { + maxDelay = *acceptor.MaxDelay } - return false, nil + // If the context expires before the waiter has accepted, return. + if ctx != nil { + go func() { + <-ctx.Done() + expired = true + }() + } + + // Loop until the condition is accepted, or the context expires, whichever comes first. + var try int + for !expired { + // Compute the next retry time so the callback can access it. + delay = time.Duration(float64(delay) * backoff) + if delay > maxDelay { + delay = maxDelay + } + + // Try the acceptance condition; if it returns true, or an error, we are done. + b, data, err := acceptor.Accept(try, delay) + if b || err != nil { + return b, data, err + } + + // About to try again. Sleep, bump the retry count, and go around the horn again. + time.Sleep(delay) + try++ + } + + return false, nil, nil } // UntilDeadline creates a child context with the given deadline, and then invokes the above Until function. -func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, error) { +func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, interface{}, error) { var cancel context.CancelFunc ctx, cancel = context.WithDeadline(ctx, deadline) - b, err := Until(ctx, acceptor) + b, data, err := Until(ctx, acceptor) cancel() - return b, err + return b, data, err } // UntilTimeout creates a child context with the given timeout, and then invokes the above Until function. -func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, error) { +func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, interface{}, error) { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) - b, err := Until(ctx, acceptor) + b, data, err := Until(ctx, acceptor) cancel() - return b, err + return b, data, err }