Use the retry framework for REST API retries
This change incorporates feedback on https://github.com/pulumi/pulumi/pull/764, in addition to refactoring the retry logic to use our retry framework rather than hand-rolling it in the REST API code. It's a minor improvement, but at least lets us consolidate some of this logic which we'll undoubtedly use more of over time.
This commit is contained in:
parent
db5318b0a5
commit
87079589f1
|
@ -3,6 +3,7 @@
|
|||
package cloud
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
"github.com/pulumi/pulumi/pkg/util/archive"
|
||||
"github.com/pulumi/pulumi/pkg/util/cmdutil"
|
||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||
"github.com/pulumi/pulumi/pkg/util/retry"
|
||||
"github.com/pulumi/pulumi/pkg/workspace"
|
||||
)
|
||||
|
||||
|
@ -444,8 +446,6 @@ func (b *cloudBackend) makeProgramUpdateRequest(stackName tokens.QName) (apitype
|
|||
}, nil
|
||||
}
|
||||
|
||||
const maxRetryTime = 5 * time.Second
|
||||
|
||||
// waitForUpdate waits for the current update of a Pulumi program to reach a terminal state. Returns the
|
||||
// final state. "path" is the URL endpoint to poll for updates.
|
||||
func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error) {
|
||||
|
@ -453,67 +453,18 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error)
|
|||
eventIndex := "0"
|
||||
for {
|
||||
// Query for the latest update results, including log entries so we can provide active status updates.
|
||||
var retries int
|
||||
var retryTime time.Duration
|
||||
var updateResults apitype.UpdateResults
|
||||
for {
|
||||
pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex)
|
||||
err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &updateResults)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// There are three kinds of errors we might see:
|
||||
// 1) Expected HTTP errors (like timeouts); silently retry.
|
||||
// 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error.
|
||||
// 3) Anything else; this could be any number of things, including transient errors (flaky network).
|
||||
// In this case, we warn the user and keep retrying; they can ^C if it's not transient.
|
||||
warn := true
|
||||
if errResp, ok := err.(*apitype.ErrorResponse); ok {
|
||||
if errResp.Code == 504 {
|
||||
// If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep
|
||||
// continuing. The sole exception is if we've done this 10 times. At that point, we will have
|
||||
// been waiting for many seconds, and want to let the user know something might be wrong.
|
||||
// TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together.
|
||||
if retries < 10 {
|
||||
warn = false
|
||||
}
|
||||
glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v",
|
||||
b.cloudURL, errResp.Code, retries, err)
|
||||
} else {
|
||||
// Otherwise, we will issue an error.
|
||||
glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v",
|
||||
b.cloudURL, errResp.Code, retries, err)
|
||||
return apitype.StatusFailed, err
|
||||
}
|
||||
} else {
|
||||
glog.V(3).Infof("Enexpected %s error after %d retries (retrying): %v", b.cloudURL, retries, err)
|
||||
}
|
||||
|
||||
// Compute a new retry time. Start at 100ms and increase by 1.5x from there, up to a maximum of 5 seconds.
|
||||
// The increasing delay avoids accidcentally DoSing the service endpoint.
|
||||
if retryTime == 0 {
|
||||
retryTime = 100 * time.Millisecond
|
||||
} else {
|
||||
retryTime = time.Duration(int64(float64(retryTime.Nanoseconds()) * 1.5))
|
||||
if retryTime > maxRetryTime {
|
||||
retryTime = maxRetryTime
|
||||
}
|
||||
}
|
||||
|
||||
// Issue a warning if appropriate.
|
||||
if warn {
|
||||
b.d.Warningf(diag.Message("error querying update status: %v"), err)
|
||||
b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"),
|
||||
retryTime.Seconds())
|
||||
}
|
||||
|
||||
// Now sleep and then go back around and try again...
|
||||
retries++
|
||||
time.Sleep(retryTime)
|
||||
pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex)
|
||||
_, results, err := retry.Until(context.Background(), retry.Acceptor{
|
||||
Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||
return b.tryNextUpdate(pathWithIndex, try, nextRetryTime)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return apitype.StatusFailed, err
|
||||
}
|
||||
|
||||
// We got a result, print it out.
|
||||
updateResults := results.(apitype.UpdateResults)
|
||||
for _, event := range updateResults.Events {
|
||||
printEvent(event)
|
||||
eventIndex = event.Index
|
||||
|
@ -530,6 +481,56 @@ func (b *cloudBackend) waitForUpdate(path string) (apitype.UpdateStatus, error)
|
|||
}
|
||||
}
|
||||
|
||||
// tryNextUpdate tries to get the next update for a Pulumi program. This may time or error out, which resutls in a
|
||||
// false returned in the first return value. If a non-nil error is returned, this operation should fail.
|
||||
func (b *cloudBackend) tryNextUpdate(pathWithIndex string,
|
||||
try int, nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||
// Perform the REST call.
|
||||
var results apitype.UpdateResults
|
||||
err := pulumiRESTCall(b.cloudURL, "GET", pathWithIndex, nil, nil, &results)
|
||||
|
||||
// If there is no error, we're done.
|
||||
if err == nil {
|
||||
return true, results, nil
|
||||
}
|
||||
|
||||
// There are three kinds of errors we might see:
|
||||
// 1) Expected HTTP errors (like timeouts); silently retry.
|
||||
// 2) Unexpected HTTP errors (like Unauthorized, etc); exit with an error.
|
||||
// 3) Anything else; this could be any number of things, including transient errors (flaky network).
|
||||
// In this case, we warn the user and keep retrying; they can ^C if it's not transient.
|
||||
warn := true
|
||||
if errResp, ok := err.(*apitype.ErrorResponse); ok {
|
||||
if errResp.Code == 504 {
|
||||
// If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it and keep
|
||||
// continuing. The sole exception is if we've done this 10 times. At that point, we will have
|
||||
// been waiting for many seconds, and want to let the user know something might be wrong.
|
||||
// TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together.
|
||||
if try < 10 {
|
||||
warn = false
|
||||
}
|
||||
glog.V(3).Infof("Expected %s HTTP %d error after %d retries (retrying): %v",
|
||||
b.cloudURL, errResp.Code, try, err)
|
||||
} else {
|
||||
// Otherwise, we will issue an error.
|
||||
glog.V(3).Infof("Unexpected %s HTTP %d error after %d retries (erroring): %v",
|
||||
b.cloudURL, errResp.Code, try, err)
|
||||
return false, nil, err
|
||||
}
|
||||
} else {
|
||||
glog.V(3).Infof("Unexpected %s error after %d retries (retrying): %v", b.cloudURL, try, err)
|
||||
}
|
||||
|
||||
// Issue a warning if appropriate.
|
||||
if warn {
|
||||
b.d.Warningf(diag.Message("error querying update status: %v"), err)
|
||||
b.d.Warningf(diag.Message("retrying in %vs... ^C to stop (this will not cancel the update)"),
|
||||
nextRetryTime.Seconds())
|
||||
}
|
||||
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
func printEvent(event apitype.UpdateEvent) {
|
||||
// Pluck out the string.
|
||||
if raw, ok := event.Fields["text"]; ok && raw != nil {
|
||||
|
|
|
@ -9,24 +9,26 @@ import (
|
|||
|
||||
type Acceptor struct {
|
||||
Accept Acceptance // a function that determines when to proceed.
|
||||
Progress Progress // an optional progress function.
|
||||
Delay *time.Duration // an optional delay duration.
|
||||
Backoff *float64 // an optional backoff multiplier.
|
||||
MaxDelay *time.Duration // an optional maximum delay duration.
|
||||
}
|
||||
|
||||
type Progress func(int) bool
|
||||
// Acceptance is meant to accept a condition. It returns true when this condition has succeeded, and false otherwise
|
||||
// (to which the retry framework responds by waiting and retrying after a certain period of time). If a non-nil error
|
||||
// is returned, retrying halts. The interface{} data may be used to return final values to the caller.
|
||||
type Acceptance func(try int, nextRetryTime time.Duration) (bool, interface{}, error)
|
||||
|
||||
const (
|
||||
DefaultDelay time.Duration = 250 * time.Millisecond // by default, delay by 250ms
|
||||
DefaultBackoff float64 = 2.0 // by default, backoff by 2.0
|
||||
DefaultDelay time.Duration = 100 * time.Millisecond // by default, delay by 100ms
|
||||
DefaultBackoff float64 = 1.5 // by default, backoff by 1.5x
|
||||
DefaultMaxDelay time.Duration = 5 * time.Second // by default, no more than 5 seconds
|
||||
)
|
||||
|
||||
type Acceptance func() (bool, error)
|
||||
|
||||
// Until waits until the acceptor accepts the current condition, or the context expires, whichever comes first. A
|
||||
// return boolean of true means the acceptor eventually accepted; a non-nil error means the acceptor returned an error.
|
||||
// If an acceptor accepts a condition after the context has expired, we ignore the expiration and return the condition.
|
||||
func Until(ctx context.Context, acceptor Acceptor) (bool, error) {
|
||||
func Until(ctx context.Context, acceptor Acceptor) (bool, interface{}, error) {
|
||||
expired := false
|
||||
|
||||
// Prepare our delay and backoff variables.
|
||||
|
@ -42,44 +44,58 @@ func Until(ctx context.Context, acceptor Acceptor) (bool, error) {
|
|||
} else {
|
||||
backoff = *acceptor.Backoff
|
||||
}
|
||||
|
||||
// If the context expires before the waiter has accepted, return.
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
expired = true
|
||||
}()
|
||||
|
||||
// Loop until the condition is accepted, or the context expires, whichever comes first.
|
||||
tries := 1
|
||||
for !expired {
|
||||
if b, err := acceptor.Accept(); b || err != nil {
|
||||
return b, err
|
||||
}
|
||||
if acceptor.Progress != nil && !acceptor.Progress(tries) {
|
||||
break // progress function asked to quit.
|
||||
}
|
||||
time.Sleep(delay)
|
||||
delay = time.Duration(float64(delay) * backoff)
|
||||
tries++
|
||||
var maxDelay time.Duration
|
||||
if acceptor.MaxDelay == nil {
|
||||
maxDelay = DefaultMaxDelay
|
||||
} else {
|
||||
maxDelay = *acceptor.MaxDelay
|
||||
}
|
||||
|
||||
return false, nil
|
||||
// If the context expires before the waiter has accepted, return.
|
||||
if ctx != nil {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
expired = true
|
||||
}()
|
||||
}
|
||||
|
||||
// Loop until the condition is accepted, or the context expires, whichever comes first.
|
||||
var try int
|
||||
for !expired {
|
||||
// Compute the next retry time so the callback can access it.
|
||||
delay = time.Duration(float64(delay) * backoff)
|
||||
if delay > maxDelay {
|
||||
delay = maxDelay
|
||||
}
|
||||
|
||||
// Try the acceptance condition; if it returns true, or an error, we are done.
|
||||
b, data, err := acceptor.Accept(try, delay)
|
||||
if b || err != nil {
|
||||
return b, data, err
|
||||
}
|
||||
|
||||
// About to try again. Sleep, bump the retry count, and go around the horn again.
|
||||
time.Sleep(delay)
|
||||
try++
|
||||
}
|
||||
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// UntilDeadline creates a child context with the given deadline, and then invokes the above Until function.
|
||||
func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, error) {
|
||||
func UntilDeadline(ctx context.Context, acceptor Acceptor, deadline time.Time) (bool, interface{}, error) {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithDeadline(ctx, deadline)
|
||||
b, err := Until(ctx, acceptor)
|
||||
b, data, err := Until(ctx, acceptor)
|
||||
cancel()
|
||||
return b, err
|
||||
return b, data, err
|
||||
}
|
||||
|
||||
// UntilTimeout creates a child context with the given timeout, and then invokes the above Until function.
|
||||
func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, error) {
|
||||
func UntilTimeout(ctx context.Context, acceptor Acceptor, timeout time.Duration) (bool, interface{}, error) {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, timeout)
|
||||
b, err := Until(ctx, acceptor)
|
||||
b, data, err := Until(ctx, acceptor)
|
||||
cancel()
|
||||
return b, err
|
||||
return b, data, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue