Use ContinuationToken (#1220)
* Use ContinuationToken * Rename 'afterIndex' to 'continuationToken' for clarity
This commit is contained in:
parent
f61998ea99
commit
fe3d854bc5
|
@ -1067,13 +1067,13 @@ func (b *cloudBackend) waitForUpdate(actionLabel string, update client.UpdateIde
|
|||
}()
|
||||
go displayEvents(strings.ToLower(actionLabel), events, done, displayOpts)
|
||||
|
||||
// Events occur in sequence, filter out all the ones we have seen before in each request.
|
||||
eventIndex := "0"
|
||||
// The UpdateEvents API returns a continuation token to only get events after the previous call.
|
||||
var continuationToken *string
|
||||
for {
|
||||
// Query for the latest update results, including log entries so we can provide active status updates.
|
||||
_, results, err := retry.Until(context.Background(), retry.Acceptor{
|
||||
Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||
return b.tryNextUpdate(update, eventIndex, try, nextRetryTime)
|
||||
return b.tryNextUpdate(update, continuationToken, try, nextRetryTime)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -1084,12 +1084,11 @@ func (b *cloudBackend) waitForUpdate(actionLabel string, update client.UpdateIde
|
|||
updateResults := results.(apitype.UpdateResults)
|
||||
for _, event := range updateResults.Events {
|
||||
events <- displayEvent{Kind: UpdateEvent, Payload: event}
|
||||
eventIndex = event.Index
|
||||
}
|
||||
|
||||
// Check if in termal state and if so return.
|
||||
switch updateResults.Status {
|
||||
case apitype.StatusFailed, apitype.StatusSucceeded:
|
||||
continuationToken = updateResults.ContinuationToken
|
||||
// A nil continuation token means there are no more events to read and the update has finished.
|
||||
if continuationToken == nil {
|
||||
return updateResults.Status, nil
|
||||
}
|
||||
}
|
||||
|
@ -1140,13 +1139,13 @@ func displayEvents(
|
|||
}
|
||||
}
|
||||
|
||||
// tryNextUpdate tries to get the next update for a Pulumi program. This may time or error out, which resutls in a
|
||||
// tryNextUpdate tries to get the next update for a Pulumi program. This may time or error out, which results in a
|
||||
// false returned in the first return value. If a non-nil error is returned, this operation should fail.
|
||||
func (b *cloudBackend) tryNextUpdate(update client.UpdateIdentifier, afterIndex string, try int,
|
||||
func (b *cloudBackend) tryNextUpdate(update client.UpdateIdentifier, continuationToken *string, try int,
|
||||
nextRetryTime time.Duration) (bool, interface{}, error) {
|
||||
|
||||
// If there is no error, we're done.
|
||||
results, err := b.client.GetUpdateEvents(update, afterIndex)
|
||||
results, err := b.client.GetUpdateEvents(update, continuationToken)
|
||||
if err == nil {
|
||||
return true, results, nil
|
||||
}
|
||||
|
|
|
@ -426,9 +426,12 @@ func (pc *Client) StartUpdate(update UpdateIdentifier, tags map[apitype.StackTag
|
|||
return resp.Version, resp.Token, nil
|
||||
}
|
||||
|
||||
// GetUpdateEvents returns all events for the indicated update after the given index.
|
||||
func (pc *Client) GetUpdateEvents(update UpdateIdentifier, afterIndex string) (apitype.UpdateResults, error) {
|
||||
path := fmt.Sprintf("%s?afterIndex=%s", getUpdatePath(update), afterIndex)
|
||||
// GetUpdateEvents returns all events, taking an optional continuation token from a previous call.
|
||||
func (pc *Client) GetUpdateEvents(update UpdateIdentifier, continuationToken *string) (apitype.UpdateResults, error) {
|
||||
path := getUpdatePath(update)
|
||||
if continuationToken != nil {
|
||||
path += fmt.Sprintf("?continuationToken=%s", *continuationToken)
|
||||
}
|
||||
|
||||
var results apitype.UpdateResults
|
||||
if err := pc.restCall("GET", path, nil, nil, &results); err != nil {
|
||||
|
|
Loading…
Reference in a new issue