pulumi/cmd/backend_cloud.go
Chris Smith 84cd810112
Move program uploads to the CLI (#571)
In an effort to improve performance and overall reliability, this PR moves the responsibility of uploading the Pulumi program from the Pulumi Service to the CLI. (Part of fixing https://github.com/pulumi/pulumi-service/issues/313.)

Previously the CLI would send (the dozens of MiB) program archive to the Service, which would then upload the data to S3. Now the CLI sends the data to S3 directly, avoiding the unnecessary copying of data around.

The Service-side API changes are in https://github.com/pulumi/pulumi-service/pull/323. I tested previews, updates, and destroys running the service and PPC on localhost.

The PR refactors how we handle the three kinds of program updates, and just unifies them into a single method. This makes the diff look crazy, but the code should be much simpler. I'm not sure what to do about supporting all the engine options for the Cloud-variants of Pulumi commands; I suspect that's something that should be handled at a later time.
2017-11-15 13:27:28 -08:00

415 lines
12 KiB
Go

// Copyright 2016-2017, Pulumi Corporation. All rights reserved.
package cmd
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/apitype"
"github.com/pulumi/pulumi/pkg/component"
"github.com/pulumi/pulumi/pkg/diag/colors"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/pack"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/util/archive"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/workspace"
"github.com/pulumi/pulumi/pkg/tokens"
)
type pulumiCloudPulumiBackend struct{}
func (b *pulumiCloudPulumiBackend) CreateStack(stackName tokens.QName, opts StackCreationOptions) error {
projID, err := getCloudProjectIdentifier()
if err != nil {
return err
}
createStackReq := apitype.CreateStackRequest{
CloudName: opts.Cloud,
StackName: stackName.String(),
}
var createStackResp apitype.CreateStackResponse
path := fmt.Sprintf("/orgs/%s/programs/%s/%s/stacks", projID.Owner, projID.Repository, projID.Project)
if err := pulumiRESTCall("POST", path, &createStackReq, &createStackResp); err != nil {
return err
}
fmt.Printf("Created Stack '%s' hosted in Cloud '%s'\n", stackName, createStackResp.CloudName)
return nil
}
func (b *pulumiCloudPulumiBackend) GetStacks() ([]stackSummary, error) {
stacks, err := getCloudStacks()
if err != nil {
return nil, err
}
// Map to a summary slice.
var summaries []stackSummary
for _, stack := range stacks {
summary := stackSummary{
Name: stack.StackName,
LastDeploy: "n/a", // TODO(pulumi-service/issues#249): Make this info available.
ResourceCount: strconv.Itoa(len(stack.Resources)),
}
// If the stack hasn't been pushed to, it's resource count doesn't matter.
if stack.ActiveUpdate == "" {
summary.ResourceCount = "n/a"
}
summaries = append(summaries, summary)
}
return summaries, nil
}
func (b *pulumiCloudPulumiBackend) RemoveStack(stackName tokens.QName, force bool) error {
projID, err := getCloudProjectIdentifier()
if err != nil {
return err
}
queryParam := ""
if force {
queryParam = "?force=true"
}
path := fmt.Sprintf("/orgs/%s/programs/%s/%s/stacks/%s%s",
projID.Owner, projID.Repository, projID.Project, string(stackName), queryParam)
// TODO[pulumi/pulumi-service#196] When the service returns a well known response for "this stack still has resources and `force`
// was not true", we should sniff for that message and return errHasResources
return pulumiRESTCall("DELETE", path, nil, nil)
}
// updateKind is an enum for describing the kinds of updates we support.
type updateKind = int
const (
update updateKind = iota
preview
destroy
)
func (b *pulumiCloudPulumiBackend) Preview(stackName tokens.QName, debug bool, _ engine.PreviewOptions) error {
return updateStack(preview, stackName, debug)
}
func (b *pulumiCloudPulumiBackend) Update(stackName tokens.QName, debug bool, _ engine.DeployOptions) error {
return updateStack(update, stackName, debug)
}
func (b *pulumiCloudPulumiBackend) Destroy(stackName tokens.QName, debug bool, _ engine.DestroyOptions) error {
return updateStack(destroy, stackName, debug)
}
// updateStack performs a the provided type of update on a stack hosted in the Pulumi Cloud.
func updateStack(kind updateKind, stackName tokens.QName, debug bool) error {
// First create the update object.
projID, err := getCloudProjectIdentifier()
if err != nil {
return err
}
updateRequest, err := makeProgramUpdateRequest(stackName)
if err != nil {
return err
}
// Generate the URL we'll use for all the REST calls.
var action string
switch kind {
case update:
action = "update"
case preview:
action = "preview"
case destroy:
action = "destroy"
default:
contract.Failf("unsupported update kind: %v", kind)
}
restURLRoot := fmt.Sprintf(
"/orgs/%s/programs/%s/%s/stacks/%s/%s",
projID.Owner, projID.Repository, projID.Project, string(stackName), action)
// Create the initial update object.
var updateResponse apitype.UpdateProgramResponse
if err = pulumiRESTCall("POST", restURLRoot, &updateRequest, &updateResponse); err != nil {
return err
}
// Upload the program's contents to the signed URL if appropriate.
if kind != destroy {
err = uploadProgram(updateResponse.UploadURL, debug /* print upload size to STDOUT */)
if err != nil {
return err
}
}
// Start the update.
restURLWithUpdateID := fmt.Sprintf("%s/%s", restURLRoot, updateResponse.UpdateID)
var startUpdateResponse apitype.StartUpdateResponse
if err = pulumiRESTCall("POST", restURLWithUpdateID, nil /* no req body */, &startUpdateResponse); err != nil {
return err
}
if kind == update {
fmt.Printf("Updating Stack '%s' to version %d...\n", string(stackName), startUpdateResponse.Version)
}
// Wait for the update to complete, which also polls and renders event output to STDOUT.
status, err := waitForUpdate(restURLWithUpdateID)
fmt.Println() // The PPC's final message we print to STDOUT doesn't include a newline.
if err != nil {
return errors.Wrapf(err, "waiting for %s", action)
}
if status != apitype.StatusSucceeded {
return errors.Errorf("%s unsuccessful: status %v", action, status)
}
fmt.Printf("%s completed successfully.\n", action)
return nil
}
// uploadProgram archives the current Pulumi program and uploads it to a signed URL. "current"
// meaning whatever Pulumi program is found in the CWD or parent directory.
// If set, printSize will print the size of the data being uploaded.
func uploadProgram(uploadURL string, printSize bool) error {
cwd, err := os.Getwd()
if err != nil {
return errors.Wrap(err, "getting working directory")
}
programPath, err := workspace.DetectPackage(cwd)
if err != nil {
return errors.Wrap(err, "looking for Pulumi package")
}
if programPath == "" {
return errors.New("no Pulumi package found")
}
// programPath is the path to the Pulumi.yaml file. Need its parent folder.
programFolder := filepath.Dir(programPath)
archiveContents, err := archive.Process(programFolder)
if err != nil {
return errors.Wrap(err, "creating archive")
}
if printSize {
mb := float32(archiveContents.Len()) / (1024.0 * 1024.0)
fmt.Printf("Uploading %.2fMiB\n", mb)
}
parsedURL, err := url.Parse(uploadURL)
if err != nil {
return errors.Wrap(err, "parsing URL")
}
resp, err := http.DefaultClient.Do(&http.Request{
Method: "PUT",
URL: parsedURL,
ContentLength: int64(archiveContents.Len()),
Body: ioutil.NopCloser(archiveContents),
})
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.Wrap(err, "upload failed")
}
return nil
}
func (b *pulumiCloudPulumiBackend) GetLogs(stackName tokens.QName, query component.LogQuery) ([]component.LogEntry, error) {
// TODO[pulumi/pulumi-service#227]: Relax these conditions once the service can take these arguments.
if query.StartTime != nil || query.EndTime != nil || query.Query != nil {
return nil, errors.New("not implemented")
}
projID, err := getCloudProjectIdentifier()
if err != nil {
return nil, err
}
var response apitype.LogsResult
path := fmt.Sprintf("/orgs/%s/programs/%s/%s/stacks/%s/logs", projID.Owner, projID.Repository, projID.Project, string(stackName))
if err = pulumiRESTCall("GET", path, nil, &response); err != nil {
return nil, err
}
logs := make([]component.LogEntry, 0, len(response.Logs))
for _, entry := range response.Logs {
logs = append(logs, component.LogEntry(entry))
}
return logs, nil
}
// getCloudStacks returns all stacks for the current repository x workspace on the Pulumi Cloud.
func getCloudStacks() ([]apitype.Stack, error) {
projID, err := getCloudProjectIdentifier()
if err != nil {
return nil, err
}
// Query all stacks for the project on Pulumi.
var stacks []apitype.Stack
path := fmt.Sprintf("/orgs/%s/programs/%s/%s/stacks", projID.Owner, projID.Repository, projID.Project)
if err := pulumiRESTCall("GET", path, nil, &stacks); err != nil {
return nil, err
}
return stacks, nil
}
// getDecryptedConfig returns the stack's configuration with any secrets in plain-text.
func getDecryptedConfig(stackName tokens.QName) (map[tokens.ModuleMember]string, error) {
cfg, err := getConfiguration(stackName)
if err != nil {
return nil, errors.Wrap(err, "getting configuration")
}
var decrypter config.ValueDecrypter = panicCrypter{}
if hasSecureValue(cfg) {
decrypter, err = getSymmetricCrypter()
if err != nil {
return nil, errors.Wrap(err, "getting symmetric crypter")
}
}
textConfig := make(map[tokens.ModuleMember]string)
for key := range cfg {
decrypted, err := cfg[key].Value(decrypter)
if err != nil {
return nil, errors.Wrap(err, "could not decrypt configuration value")
}
textConfig[key] = decrypted
}
return textConfig, nil
}
// getCloudProjectIdentifier returns information about the current repository and project, based on the current working
// directory.
func getCloudProjectIdentifier() (*cloudProjectIdentifier, error) {
w, err := newWorkspace()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
path, err := workspace.DetectPackage(cwd)
if err != nil {
return nil, err
}
pkg, err := pack.Load(path)
if err != nil {
return nil, err
}
repo := w.Repository()
return &cloudProjectIdentifier{
Owner: repo.Owner,
Repository: repo.Name,
Project: pkg.Name,
}, nil
}
// makeProgramUpdateRequest constructs the apitype.UpdateProgramRequest based on the local machine state.
func makeProgramUpdateRequest(stackName tokens.QName) (apitype.UpdateProgramRequest, error) {
// Zip up the Pulumi program's directory, which may be a parent of CWD.
cwd, err := os.Getwd()
if err != nil {
return apitype.UpdateProgramRequest{}, errors.Errorf("getting working directory: %v", err)
}
programPath, err := workspace.DetectPackage(cwd)
if err != nil {
return apitype.UpdateProgramRequest{}, errors.Errorf("looking for Pulumi package: %v", err)
}
if programPath == "" {
return apitype.UpdateProgramRequest{}, errors.Errorf("no Pulumi package found")
}
// Load the package, since we now require passing the Runtime with the update request.
pkg, err := pack.Load(programPath)
if err != nil {
return apitype.UpdateProgramRequest{}, err
}
description := ""
if pkg.Description != nil {
description = *pkg.Description
}
// Gather up configuration.
// TODO(pulumi-service/issues/221): Have pulumi.com handle the encryption/decryption.
textConfig, err := getDecryptedConfig(stackName)
if err != nil {
return apitype.UpdateProgramRequest{}, errors.Wrap(err, "getting decrypted configuration")
}
return apitype.UpdateProgramRequest{
Name: pkg.Name,
Runtime: pkg.Runtime,
Description: description,
Config: textConfig,
}, nil
}
// waitForUpdate waits for the current update of a Pulumi program to reach a terminal state. Returns the
// final state. "path" is the URL endpoint to poll for updates.
func waitForUpdate(path string) (apitype.UpdateStatus, error) {
time.Sleep(5 * time.Second)
// Events occur in sequence, filter out all the ones we have seen before in each request.
eventIndex := "0"
for {
time.Sleep(2 * time.Second)
var updateResults apitype.UpdateResults
pathWithIndex := fmt.Sprintf("%s?afterIndex=%s", path, eventIndex)
if err := pulumiRESTCall("GET", pathWithIndex, nil, &updateResults); err != nil {
// If our request to the Pulumi Service returned a 504 (Gateway Timeout), ignore it
// and keep continuing.
// TODO(pulumi/pulumi-ppc/issues/60): Elminate these timeouts all together.
if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == 504 {
time.Sleep(5 * time.Second)
continue
}
}
for _, event := range updateResults.Events {
printEvent(event)
eventIndex = event.Index
}
// Check if in termal state.
updateStatus := apitype.UpdateStatus(updateResults.Status)
switch updateStatus {
case apitype.StatusFailed:
fallthrough
case apitype.StatusSucceeded:
return updateStatus, nil
}
}
}
func printEvent(event apitype.UpdateEvent) {
stream := os.Stdout // Ignoring event.Kind which could be StderrEvent.
rawEntry, ok := event.Fields["text"]
if !ok {
return
}
text := rawEntry.(string)
if colorize, ok := event.Fields["colorize"].(bool); ok && colorize {
text = colors.ColorizeText(text)
}
fmt.Fprint(stream, text)
}