pulumi/pkg/backend/filestate/backend.go
joeduffy 234c3dfec9 Add a --json flag to the preview command
This change adds a --json flag to the preview command, enabling
basic JSON serialization of preview plans. This effectively flattens
the engine event stream into a preview structure that contains a list
of steps, diagnostics, and summary information. Each step contains
the deep serialization of resource state, in addition to metadata about
the step, such as what kind of operation it entails.

This is a partial implementation of pulumi/pulumi#2390. In particular,
we only support --json on the `preview` command itself, and not `up`,
meaning that it isn't possible to serialize the result of an actual
deployment yet (thereby limiting what you can do with outputs, etc).
2019-04-25 17:37:16 -07:00

637 lines
18 KiB
Go

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package filestate
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"os/user"
"path"
"path/filepath"
"strings"
"time"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // driver for azblob://
_ "gocloud.dev/blob/fileblob" // driver for file://
_ "gocloud.dev/blob/gcsblob" // driver for gs://
_ "gocloud.dev/blob/s3blob" // driver for s3://
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/apitype"
"github.com/pulumi/pulumi/pkg/backend"
"github.com/pulumi/pulumi/pkg/backend/display"
"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/diag/colors"
"github.com/pulumi/pulumi/pkg/encoding"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/operations"
"github.com/pulumi/pulumi/pkg/resource/config"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/resource/edit"
"github.com/pulumi/pulumi/pkg/resource/stack"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/logging"
"github.com/pulumi/pulumi/pkg/util/result"
"github.com/pulumi/pulumi/pkg/workspace"
)
// Backend extends the base backend interface with specific information about local backends.
type Backend interface {
backend.Backend
local() // at the moment, no local specific info, so just use a marker function.
}
type localBackend struct {
d diag.Sink
url string
stackConfigFile string
bucket *blob.Bucket
}
type localBackendReference struct {
name tokens.QName
}
func (r localBackendReference) String() string {
return string(r.name)
}
func (r localBackendReference) Name() tokens.QName {
return r.name
}
func IsFileStateBackendURL(urlstr string) bool {
u, err := url.Parse(urlstr)
if err != nil {
return false
}
return blob.DefaultURLMux().ValidBucketScheme(u.Scheme)
}
func New(d diag.Sink, u, stackConfigFile string) (Backend, error) {
if !IsFileStateBackendURL(u) {
return nil, errors.Errorf("local URL %s has an illegal prefix; expected one of: %s",
u, strings.Join(blob.DefaultURLMux().BucketSchemes(), ", "))
}
var localPath string
if u == "file://~" {
usr, err := user.Current()
if err != nil {
return nil, errors.Wrap(err, "Could not determine current user")
}
localPath = usr.HomeDir
} else if strings.HasPrefix(u, "file://") {
// For file:// backend, ensure a relative path is resolved. fileblob only supports absolute paths.
localPath, _ = filepath.Abs(strings.TrimPrefix(u, "file://"))
}
if localPath != "" {
u2 := url.URL{Scheme: "file", Path: localPath}
u = u2.String()
if err := os.MkdirAll(localPath, 0700); err != nil {
return nil, errors.Wrap(err, "An IO error occurred during the current operation")
}
}
bucket, err := blob.OpenBucket(context.TODO(), u)
if err != nil {
return nil, errors.Wrapf(err, "unable to open bucket %s", u)
}
return &localBackend{
d: d,
url: u,
stackConfigFile: stackConfigFile,
bucket: bucket,
}, nil
}
func Login(d diag.Sink, url, stackConfigFile string) (Backend, error) {
be, err := New(d, url, stackConfigFile)
if err != nil {
return nil, err
}
return be, workspace.StoreAccessToken(url, "", true)
}
func (b *localBackend) local() {}
func (b *localBackend) Name() string {
name, err := os.Hostname()
contract.IgnoreError(err)
if name == "" {
name = "local"
}
return name
}
func (b *localBackend) URL() string {
return b.url
}
func (b *localBackend) StateDir() string {
return workspace.BookkeepingDir
}
func (b *localBackend) ParseStackReference(stackRefName string) (backend.StackReference, error) {
return localBackendReference{name: tokens.QName(stackRefName)}, nil
}
func (b *localBackend) CreateStack(ctx context.Context, stackRef backend.StackReference,
opts interface{}) (backend.Stack, error) {
contract.Requiref(opts == nil, "opts", "local stacks do not support any options")
stackName := stackRef.Name()
if stackName == "" {
return nil, errors.New("invalid empty stack name")
}
if _, _, _, err := b.getStack(stackName); err == nil {
return nil, &backend.StackAlreadyExistsError{StackName: string(stackName)}
}
tags, err := backend.GetEnvironmentTagsForCurrentStack()
if err != nil {
return nil, errors.Wrap(err, "getting stack tags")
}
if err = backend.ValidateStackProperties(string(stackName), tags); err != nil {
return nil, errors.Wrap(err, "validating stack properties")
}
file, err := b.saveStack(stackName, nil, nil)
if err != nil {
return nil, err
}
stack := newStack(stackRef, file, nil, nil, b)
fmt.Printf("Created stack '%s'\n", stack.Ref())
return stack, nil
}
func (b *localBackend) GetStack(ctx context.Context, stackRef backend.StackReference) (backend.Stack, error) {
stackName := stackRef.Name()
config, snapshot, path, err := b.getStack(stackName)
switch {
case os.IsNotExist(errors.Cause(err)):
return nil, nil
case err != nil:
return nil, err
default:
return newStack(stackRef, path, config, snapshot, b), nil
}
}
func (b *localBackend) ListStacks(
ctx context.Context, projectFilter *tokens.PackageName) ([]backend.StackSummary, error) {
stacks, err := b.getLocalStacks()
if err != nil {
return nil, err
}
var results []backend.StackSummary
for _, stackName := range stacks {
stack, err := b.GetStack(ctx, localBackendReference{name: stackName})
if err != nil {
return nil, err
}
localStack, ok := stack.(*localStack)
contract.Assertf(ok, "localBackend GetStack returned non-localStack")
results = append(results, newLocalStackSummary(localStack))
}
return results, nil
}
func (b *localBackend) RemoveStack(ctx context.Context, stackRef backend.StackReference, force bool) (bool, error) {
stackName := stackRef.Name()
_, snapshot, _, err := b.getStack(stackName)
if err != nil {
return false, err
}
// Don't remove stacks that still have resources.
if !force && snapshot != nil && len(snapshot.Resources) > 0 {
return true, errors.New("refusing to remove stack because it still contains resources")
}
return false, b.removeStack(stackName)
}
func (b *localBackend) RenameStack(ctx context.Context, stackRef backend.StackReference, newName tokens.QName) error {
stackName := stackRef.Name()
cfg, snap, _, err := b.getStack(stackName)
if err != nil {
return err
}
// Ensure the destination stack does not already exist.
_, err = os.Stat(b.stackPath(newName))
if err == nil {
return errors.Errorf("a stack named %s already exists", newName)
} else if !os.IsNotExist(err) {
return err
}
// Rewrite the checkpoint and save it with the new name.
if err = edit.RenameStack(snap, newName); err != nil {
return err
}
if _, err = b.saveStack(newName, cfg, snap); err != nil {
return err
}
// To remove the old stack, just make a backup of the file and don't write out anything new.
file := b.stackPath(stackName)
backupTarget(b.bucket, file)
// And move the history over as well.
oldHistoryDir := b.historyDirectory(stackName)
newHistoryDir := b.historyDirectory(newName)
return os.Rename(oldHistoryDir, newHistoryDir)
}
func (b *localBackend) GetStackCrypter(stackRef backend.StackReference) (config.Crypter, error) {
return symmetricCrypter(stackRef.Name(), b.stackConfigFile)
}
func (b *localBackend) GetLatestConfiguration(ctx context.Context,
stackRef backend.StackReference) (config.Map, error) {
hist, err := b.GetHistory(ctx, stackRef)
if err != nil {
return nil, err
}
if len(hist) == 0 {
return nil, backend.ErrNoPreviousDeployment
}
return hist[0].Config, nil
}
func (b *localBackend) Preview(ctx context.Context, stackRef backend.StackReference,
op backend.UpdateOperation) (engine.ResourceChanges, result.Result) {
// Get the stack.
stack, err := b.GetStack(ctx, stackRef)
if err != nil {
return nil, result.FromError(err)
}
// We can skip PreviewThenPromptThenExecute and just go straight to Execute.
opts := backend.ApplierOptions{
DryRun: true,
ShowLink: true,
}
return b.apply(ctx, apitype.PreviewUpdate, stack, op, opts, nil /*events*/)
}
func (b *localBackend) Update(ctx context.Context, stackRef backend.StackReference,
op backend.UpdateOperation) (engine.ResourceChanges, result.Result) {
stack, err := b.GetStack(ctx, stackRef)
if err != nil {
return nil, result.FromError(err)
}
return backend.PreviewThenPromptThenExecute(ctx, apitype.UpdateUpdate, stack, op, b.apply)
}
func (b *localBackend) Refresh(ctx context.Context, stackRef backend.StackReference,
op backend.UpdateOperation) (engine.ResourceChanges, result.Result) {
stack, err := b.GetStack(ctx, stackRef)
if err != nil {
return nil, result.FromError(err)
}
return backend.PreviewThenPromptThenExecute(ctx, apitype.RefreshUpdate, stack, op, b.apply)
}
func (b *localBackend) Destroy(ctx context.Context, stackRef backend.StackReference,
op backend.UpdateOperation) (engine.ResourceChanges, result.Result) {
stack, err := b.GetStack(ctx, stackRef)
if err != nil {
return nil, result.FromError(err)
}
return backend.PreviewThenPromptThenExecute(ctx, apitype.DestroyUpdate, stack, op, b.apply)
}
// apply actually performs the provided type of update on a locally hosted stack.
func (b *localBackend) apply(
ctx context.Context, kind apitype.UpdateKind, stack backend.Stack,
op backend.UpdateOperation, opts backend.ApplierOptions,
events chan<- engine.Event) (engine.ResourceChanges, result.Result) {
stackRef := stack.Ref()
stackName := stackRef.Name()
// Print a banner so it's clear this is a local deployment.
actionLabel := backend.ActionLabel(kind, opts.DryRun)
_, err := fmt.Fprintf(os.Stderr, op.Opts.Display.Color.Colorize(
colors.SpecHeadline+"%s (%s):"+colors.Reset+"\n"), actionLabel, stackRef)
contract.IgnoreError(err)
// Start the update.
update, err := b.newUpdate(stackName, op.Proj, op.Root)
if err != nil {
return nil, result.FromError(err)
}
// Spawn a display loop to show events on the CLI.
displayEvents := make(chan engine.Event)
displayDone := make(chan bool)
go display.ShowEvents(
strings.ToLower(actionLabel), kind, stackName, op.Proj.Name,
displayEvents, displayDone, op.Opts.Display, opts.DryRun)
// Create a separate event channel for engine events that we'll pipe to both listening streams.
engineEvents := make(chan engine.Event)
scope := op.Scopes.NewScope(engineEvents, opts.DryRun)
eventsDone := make(chan bool)
go func() {
// Pull in all events from the engine and send them to the two listeners.
for e := range engineEvents {
displayEvents <- e
// If the caller also wants to see the events, stream them there also.
if events != nil {
events <- e
}
}
close(eventsDone)
}()
// Create the management machinery.
persister := b.newSnapshotPersister(stackName)
manager := backend.NewSnapshotManager(persister, update.GetTarget().Snapshot)
engineCtx := &engine.Context{
Cancel: scope.Context(),
Events: engineEvents,
SnapshotManager: manager,
BackendClient: backend.NewBackendClient(b),
}
// Perform the update
start := time.Now().Unix()
var changes engine.ResourceChanges
var updateRes result.Result
switch kind {
case apitype.PreviewUpdate:
changes, updateRes = engine.Update(update, engineCtx, op.Opts.Engine, true)
case apitype.UpdateUpdate:
changes, updateRes = engine.Update(update, engineCtx, op.Opts.Engine, opts.DryRun)
case apitype.RefreshUpdate:
changes, updateRes = engine.Refresh(update, engineCtx, op.Opts.Engine, opts.DryRun)
case apitype.DestroyUpdate:
changes, updateRes = engine.Destroy(update, engineCtx, op.Opts.Engine, opts.DryRun)
default:
contract.Failf("Unrecognized update kind: %s", kind)
}
end := time.Now().Unix()
// Wait for the display to finish showing all the events.
<-displayDone
scope.Close() // Don't take any cancellations anymore, we're shutting down.
close(engineEvents)
contract.IgnoreClose(manager)
// Make sure the goroutine writing to displayEvents and events has exited before proceeding.
<-eventsDone
close(displayEvents)
// Save update results.
backendUpdateResult := backend.SucceededResult
if updateRes != nil {
backendUpdateResult = backend.FailedResult
}
info := backend.UpdateInfo{
Kind: kind,
StartTime: start,
Message: op.M.Message,
Environment: op.M.Environment,
Config: update.GetTarget().Config,
Result: backendUpdateResult,
EndTime: end,
// IDEA: it would be nice to populate the *Deployment, so that addToHistory below doesn't need to
// rudely assume it knows where the checkpoint file is on disk as it makes a copy of it. This isn't
// trivial to achieve today given the event driven nature of plan-walking, however.
ResourceChanges: changes,
}
var saveErr error
var backupErr error
if !opts.DryRun {
saveErr = b.addToHistory(stackName, info)
backupErr = b.backupStack(stackName)
}
if updateRes != nil {
// We swallow saveErr and backupErr as they are less important than the updateErr.
return changes, updateRes
}
if saveErr != nil {
// We swallow backupErr as it is less important than the saveErr.
return changes, result.FromError(errors.Wrap(saveErr, "saving update info"))
}
if backupErr != nil {
return changes, result.FromError(errors.Wrap(backupErr, "saving backup"))
}
// Make sure to print a link to the stack's checkpoint before exiting.
if opts.ShowLink {
// Note we get a real signed link for aws/azure/gcp links. But no such option exists for
// file:// links so we manually create the link ourselves.
var link string
if strings.HasPrefix(b.url, "file://") {
u, _ := url.Parse(b.url)
u.Path = path.Join(u.Path, b.stackPath(stackName))
link = u.String()
} else {
link, err = b.bucket.SignedURL(context.TODO(), b.stackPath(stackName), nil)
if err != nil {
return changes, result.FromError(errors.Wrap(err, "Could not get signed url for stack location"))
}
}
_, err := fmt.Fprintf(os.Stderr,
op.Opts.Display.Color.Colorize(
colors.SpecHeadline+"Permalink: "+
colors.Underline+colors.BrightBlue+"%s"+colors.Reset+"\n"), link)
contract.IgnoreError(err)
}
return changes, nil
}
func (b *localBackend) GetHistory(ctx context.Context, stackRef backend.StackReference) ([]backend.UpdateInfo, error) {
stackName := stackRef.Name()
updates, err := b.getHistory(stackName)
if err != nil {
return nil, err
}
return updates, nil
}
func (b *localBackend) GetLogs(ctx context.Context, stackRef backend.StackReference,
query operations.LogQuery) ([]operations.LogEntry, error) {
stackName := stackRef.Name()
target, err := b.getTarget(stackName)
if err != nil {
return nil, err
}
return GetLogsForTarget(target, query)
}
// GetLogsForTarget fetches stack logs using the config, decrypter, and checkpoint in the given target.
func GetLogsForTarget(target *deploy.Target, query operations.LogQuery) ([]operations.LogEntry, error) {
contract.Assert(target != nil)
contract.Assert(target.Snapshot != nil)
config, err := target.Config.Decrypt(target.Decrypter)
if err != nil {
return nil, err
}
components := operations.NewResourceTree(target.Snapshot.Resources)
ops := components.OperationsProvider(config)
logs, err := ops.GetLogs(query)
if logs == nil {
return nil, err
}
return *logs, err
}
func (b *localBackend) ExportDeployment(ctx context.Context,
stackRef backend.StackReference) (*apitype.UntypedDeployment, error) {
stackName := stackRef.Name()
_, snap, _, err := b.getStack(stackName)
if err != nil {
return nil, err
}
if snap == nil {
snap = deploy.NewSnapshot(deploy.Manifest{}, nil, nil)
}
data, err := json.Marshal(stack.SerializeDeployment(snap))
if err != nil {
return nil, err
}
return &apitype.UntypedDeployment{
Version: 3,
Deployment: json.RawMessage(data),
}, nil
}
func (b *localBackend) ImportDeployment(ctx context.Context, stackRef backend.StackReference,
deployment *apitype.UntypedDeployment) error {
stackName := stackRef.Name()
config, _, _, err := b.getStack(stackName)
if err != nil {
return err
}
snap, err := stack.DeserializeUntypedDeployment(deployment)
if err != nil {
return err
}
_, err = b.saveStack(stackName, config, snap)
return err
}
func (b *localBackend) Logout() error {
return workspace.DeleteAccessToken(b.url)
}
func (b *localBackend) CurrentUser() (string, error) {
user, err := user.Current()
if err != nil {
return "", err
}
return user.Username, nil
}
func (b *localBackend) getLocalStacks() ([]tokens.QName, error) {
var stacks []tokens.QName
// Read the stack directory.
path := b.stackPath("")
files, err := listBucket(b.bucket, path)
if err != nil {
return nil, errors.Wrap(err, "error listing stacks")
}
for _, file := range files {
// Ignore directories.
if file.IsDir {
continue
}
// Skip files without valid extensions (e.g., *.bak files).
stackfn := objectName(file)
ext := filepath.Ext(stackfn)
if _, has := encoding.Marshalers[ext]; !has {
continue
}
// Read in this stack's information.
name := tokens.QName(stackfn[:len(stackfn)-len(ext)])
_, _, _, err := b.getStack(name)
if err != nil {
logging.V(5).Infof("error reading stack: %v (%v) skipping", name, err)
continue // failure reading the stack information.
}
stacks = append(stacks, name)
}
return stacks, nil
}
// GetStackTags fetches the stack's existing tags.
func (b *localBackend) GetStackTags(ctx context.Context,
stackRef backend.StackReference) (map[apitype.StackTagName]string, error) {
// The local backend does not currently persist tags.
return nil, errors.New("stack tags not supported in --local mode")
}
// UpdateStackTags updates the stacks's tags, replacing all existing tags.
func (b *localBackend) UpdateStackTags(ctx context.Context,
stackRef backend.StackReference, tags map[apitype.StackTagName]string) error {
// The local backend does not currently persist tags.
return errors.New("stack tags not supported in --local mode")
}