Implement resource plans in the engine

This commit is contained in:
Pat Gavlin 2020-11-15 20:26:21 -08:00
parent e71fd81fe8
commit 33e8980cee
13 changed files with 291 additions and 40 deletions

View file

@ -23,7 +23,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)
func Destroy(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, result.Result) {
func Destroy(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Plan, ResourceChanges, result.Result) {
contract.Require(u != nil, "u")
contract.Require(ctx != nil, "ctx")
@ -31,13 +31,13 @@ func Destroy(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Resou
info, err := newDeploymentContext(u, "destroy", ctx.ParentSpan)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer info.Close()
emitter, err := makeEventEmitter(ctx.Events, u)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer emitter.Close()

View file

@ -21,7 +21,7 @@ import (
)
func Import(u UpdateInfo, ctx *Context, opts UpdateOptions, imports []deploy.Import,
dryRun bool) (ResourceChanges, result.Result) {
dryRun bool) (Plan, ResourceChanges, result.Result) {
contract.Require(u != nil, "u")
contract.Require(ctx != nil, "ctx")
@ -30,13 +30,13 @@ func Import(u UpdateInfo, ctx *Context, opts UpdateOptions, imports []deploy.Imp
info, err := newDeploymentContext(u, "import", ctx.ParentSpan)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer info.Close()
emitter, err := makeEventEmitter(ctx.Events, u)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer emitter.Close()

View file

@ -2588,3 +2588,81 @@ func TestComponentDeleteDependencies(t *testing.T) {
}
p.Run(t, nil)
}
func TestPlannedUpdate(t *testing.T) {
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{
CreateF: func(urn resource.URN, news resource.PropertyMap, timeout float64,
preview bool) (resource.ID, resource.PropertyMap, resource.Status, error) {
return "created-id", news, resource.StatusOK, nil
},
UpdateF: func(urn resource.URN, id resource.ID, olds, news resource.PropertyMap, timeout float64,
ignoreChanges []string, preview bool) (resource.PropertyMap, resource.Status, error) {
return news, resource.StatusOK, nil
},
}, nil
}),
}
var ins resource.PropertyMap
program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true, deploytest.ResourceOptions{
Inputs: ins,
})
assert.NoError(t, err)
return nil
})
host := deploytest.NewPluginHost(nil, nil, program, loaders...)
p := &TestPlan{
Options: UpdateOptions{Host: host},
}
project := p.GetProject()
// Generate a plan.
computed := interface{}(resource.Computed{Element: resource.NewStringProperty("")})
ins = resource.NewPropertyMapFromMap(map[string]interface{}{
"foo": "bar",
"baz": map[string]interface{}{
"a": 42,
"b": computed,
},
"qux": []interface{}{
computed,
24,
},
"zed": computed,
})
plan, res := TestOp(Update).Plan(project, p.GetTarget(nil), p.Options, p.BackendClient, nil)
assert.Nil(t, res)
// Run an update using the plan.
ins = resource.NewPropertyMapFromMap(map[string]interface{}{
"qux": []interface{}{
"alpha",
24,
},
})
p.Options.Plan = plan
snap, res := TestOp(Update).Run(project, p.GetTarget(nil), p.Options, false, p.BackendClient, nil)
assert.Nil(t, res)
// Check the resource's state.
if !assert.Len(t, snap.Resources, 2) {
return
}
expected := resource.NewPropertyMapFromMap(map[string]interface{}{
"foo": "bar",
"baz": map[string]interface{}{
"a": 42,
},
"qux": []interface{}{
"alpha",
24,
},
})
assert.Equal(t, expected, snap.Resources[1].Outputs)
}

View file

@ -39,16 +39,25 @@ func (u *updateInfo) GetTarget() *deploy.Target {
}
func ImportOp(imports []deploy.Import) TestOp {
return TestOp(func(info UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, result.Result) {
return TestOp(func(info UpdateInfo, ctx *Context, opts UpdateOptions,
dryRun bool) (Plan, ResourceChanges, result.Result) {
return Import(info, ctx, opts, imports, dryRun)
})
}
type TestOp func(UpdateInfo, *Context, UpdateOptions, bool) (ResourceChanges, result.Result)
type TestOp func(UpdateInfo, *Context, UpdateOptions, bool) (Plan, ResourceChanges, result.Result)
type ValidateFunc func(project workspace.Project, target deploy.Target, entries JournalEntries,
events []Event, res result.Result) result.Result
func (op TestOp) Plan(project workspace.Project, target deploy.Target, opts UpdateOptions,
backendClient deploy.BackendClient, validate ValidateFunc) (Plan, result.Result) {
plan, _, res := op.runWithContext(context.Background(), project, target, opts, true, backendClient, validate)
return plan, res
}
func (op TestOp) Run(project workspace.Project, target deploy.Target, opts UpdateOptions,
dryRun bool, backendClient deploy.BackendClient, validate ValidateFunc) (*deploy.Snapshot, result.Result) {
@ -60,6 +69,15 @@ func (op TestOp) RunWithContext(
target deploy.Target, opts UpdateOptions, dryRun bool,
backendClient deploy.BackendClient, validate ValidateFunc) (*deploy.Snapshot, result.Result) {
_, snap, res := op.runWithContext(callerCtx, project, target, opts, dryRun, backendClient, validate)
return snap, res
}
func (op TestOp) runWithContext(
callerCtx context.Context, project workspace.Project,
target deploy.Target, opts UpdateOptions, dryRun bool,
backendClient deploy.BackendClient, validate ValidateFunc) (Plan, *deploy.Snapshot, result.Result) {
// Create an appropriate update info and context.
info := &updateInfo{project: project, target: target}
@ -93,11 +111,11 @@ func (op TestOp) RunWithContext(
}()
// Run the step and its validator.
_, res := op(info, ctx, opts, dryRun)
plan, _, res := op(info, ctx, opts, dryRun)
contract.IgnoreClose(journal)
if dryRun {
return nil, res
return plan, nil, res
}
if validate != nil {
res = validate(project, target, journal.Entries(), firedEvents, res)
@ -107,7 +125,7 @@ func (op TestOp) RunWithContext(
if res == nil && snap != nil {
res = result.WrapIfNonNil(snap.VerifyIntegrity())
}
return snap, res
return nil, snap, res
}
type TestStep struct {

View file

@ -23,7 +23,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)
func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, result.Result) {
func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Plan, ResourceChanges, result.Result) {
contract.Require(u != nil, "u")
contract.Require(ctx != nil, "ctx")
@ -31,13 +31,13 @@ func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Resou
info, err := newDeploymentContext(u, "refresh", ctx.ParentSpan)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer info.Close()
emitter, err := makeEventEmitter(ctx.Events, u)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer emitter.Close()

View file

@ -145,6 +145,9 @@ type UpdateOptions struct {
// the plugin host to use for this update
Host plugin.Host
// The plan to use for the update, if any.
Plan Plan
}
// ResourceChanges contains the aggregate resource changes by operation type.
@ -164,7 +167,10 @@ func (changes ResourceChanges) HasChanges() bool {
return c > 0
}
func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (ResourceChanges, result.Result) {
// Plan records planned resource changes.
type Plan map[resource.URN]*deploy.ResourcePlan
func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Plan, ResourceChanges, result.Result) {
contract.Require(u != nil, "update")
contract.Require(ctx != nil, "ctx")
@ -172,13 +178,13 @@ func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Resour
info, err := newDeploymentContext(u, "update", ctx.ParentSpan)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer info.Close()
emitter, err := makeEventEmitter(ctx.Events, u)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer emitter.Close()
@ -419,7 +425,7 @@ func newUpdateSource(
}
func update(ctx *Context, info *deploymentContext, opts deploymentOptions,
preview bool) (ResourceChanges, result.Result) {
preview bool) (Plan, ResourceChanges, result.Result) {
// Refresh and Import do not execute Policy Packs.
policies := map[string]string{}
@ -444,7 +450,7 @@ func update(ctx *Context, info *deploymentContext, opts deploymentOptions,
deployment, err := newDeployment(ctx, info, opts, preview)
if err != nil {
return nil, result.FromError(err)
return nil, nil, result.FromError(err)
}
defer contract.IgnoreClose(deployment)

View file

@ -153,6 +153,7 @@ type Deployment struct {
target *Target // the deployment target.
prev *Snapshot // the old resource snapshot for comparison.
olds map[resource.URN]*resource.State // a map of all old resources.
resourcePlans map[resource.URN]*ResourcePlan // a map of all planned resource changes, if any.
imports []Import // resources to import, if this is an import deployment.
isImport bool // true if this is an import deployment.
schemaLoader schema.Loader // the schema cache for this deployment, if any.
@ -162,7 +163,8 @@ type Deployment struct {
depGraph *graph.DependencyGraph // the dependency graph of the old snapshot.
providers *providers.Registry // the provider registry for this deployment.
goals *goalMap // the set of resource goals generated by the deployment.
news *resourceMap // the set of new resources generated by the deployment.
news *resourceMap // the set of new resources generated by the deployment
newResourcePlans map[resource.URN]*ResourcePlan // the set of new resource plans.
}
// addDefaultProviders adds any necessary default provider definitions and references to the given snapshot. Version
@ -299,7 +301,7 @@ func buildResourceMap(prev *Snapshot, preview bool) ([]*resource.State, map[reso
//
// Note that a deployment uses internal concurrency and parallelism in various ways, so it must be closed if for some
// reason it isn't carried out to its final conclusion. This will result in cancellation and reclamation of resources.
func NewDeployment(ctx *plugin.Context, target *Target, prev *Snapshot, source Source,
func NewDeployment(ctx *plugin.Context, target *Target, prev *Snapshot, plans map[resource.URN]*ResourcePlan, source Source,
localPolicyPackPaths []string, preview bool, backendClient BackendClient) (*Deployment, error) {
contract.Assert(ctx != nil)
@ -343,6 +345,7 @@ func NewDeployment(ctx *plugin.Context, target *Target, prev *Snapshot, source S
ctx: ctx,
target: target,
prev: prev,
resourcePlans: plans,
olds: olds,
source: source,
localPolicyPackPaths: localPolicyPackPaths,
@ -351,6 +354,7 @@ func NewDeployment(ctx *plugin.Context, target *Target, prev *Snapshot, source S
providers: reg,
goals: newGoals,
news: newResources,
newResourcePlans: map[resource.URN]*ResourcePlan{},
}, nil
}
@ -405,7 +409,7 @@ func (d *Deployment) generateEventURN(event SourceEvent) resource.URN {
}
// Execute executes a deployment to completion, using the given cancellation context and running a preview or update.
func (d *Deployment) Execute(ctx context.Context, opts Options, preview bool) result.Result {
func (d *Deployment) Execute(ctx context.Context, opts Options, preview bool) (map[resource.URN]*ResourcePlan, result.Result) {
deploymentExec := &deploymentExecutor{deployment: d}
return deploymentExec.Execute(ctx, opts, preview)
}

View file

@ -114,7 +114,7 @@ func (ex *deploymentExecutor) reportError(urn resource.URN, err error) {
// Execute executes a deployment to completion, using the given cancellation context and running a preview
// or update.
func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, preview bool) result.Result {
func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, preview bool) (map[resource.URN]*ResourcePlan, result.Result) {
// Set up a goroutine that will signal cancellation to the deployment's plugins if the caller context is cancelled.
// We do not hang this off of the context we create below because we do not want the failure of a single step to
// cause other steps to fail.
@ -141,10 +141,10 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
// Before doing anything else, optionally refresh each resource in the base checkpoint.
if opts.Refresh {
if res := ex.refresh(callerCtx, opts, preview); res != nil {
return res
return nil, res
}
if opts.RefreshOnly {
return nil
return nil, nil
}
}
@ -156,10 +156,10 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
replaceTargetsOpt := createTargetMap(opts.ReplaceTargets)
destroyTargetsOpt := createTargetMap(opts.DestroyTargets)
if res := ex.checkTargets(opts.ReplaceTargets, OpReplace); res != nil {
return res
return nil, res
}
if res := ex.checkTargets(opts.DestroyTargets, OpDelete); res != nil {
return res
return nil, res
}
if (updateTargetsOpt != nil || replaceTargetsOpt != nil) && destroyTargetsOpt != nil {
@ -169,7 +169,7 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
// Begin iterating the source.
src, res := ex.deployment.source.Iterate(callerCtx, opts, ex.deployment)
if res != nil {
return res
return nil, res
}
// Set up a step generator for this deployment.
@ -177,7 +177,7 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
// Retire any pending deletes that are currently present in this deployment.
if res := ex.retirePendingDeletes(callerCtx, opts, preview); res != nil {
return res
return nil, res
}
// Derive a cancellable context for this deployment. We will only cancel this context if some piece of the
@ -268,7 +268,7 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
}
if res != nil && res.IsBail() {
return res
return nil, res
}
// If the step generator and step executor were both successful, then we send all the resources
@ -280,7 +280,7 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
logging.V(4).Infof("deploymentExecutor.Execute(...): error analyzing resources: %v", resErr)
ex.reportError("", resErr)
}
return result.Bail()
return nil, result.Bail()
}
}
@ -289,13 +289,13 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
// TODO(cyrusn): We seem to be losing any information about the original 'res's errors. Should
// we be doing a merge here?
ex.reportExecResult("failed", preview)
return result.Bail()
return nil, result.Bail()
} else if canceled {
ex.reportExecResult("canceled", preview)
return result.Bail()
return nil, result.Bail()
}
return res
return pe.plan.newResourcePlans, res
}
func (ex *deploymentExecutor) performDeletes(
@ -433,9 +433,9 @@ func (ex *deploymentExecutor) retirePendingDeletes(callerCtx context.Context, op
}
// import imports a list of resources into a stack.
func (ex *deploymentExecutor) importResources(callerCtx context.Context, opts Options, preview bool) result.Result {
func (ex *deploymentExecutor) importResources(callerCtx context.Context, opts Options, preview bool) (map[resource.URN]*ResourcePlan, result.Result) {
if len(ex.deployment.imports) == 0 {
return nil
return nil, nil
}
// Create an executor for this import.
@ -461,12 +461,12 @@ func (ex *deploymentExecutor) importResources(callerCtx context.Context, opts Op
} else {
ex.reportExecResult("failed", preview)
}
return result.Bail()
return nil, result.Bail()
} else if canceled {
ex.reportExecResult("canceled", preview)
return result.Bail()
return nil, result.Bail()
}
return nil
return pe.plan.newResourcePlans, nil
}
// refresh refreshes the state of the base checkpoint file for the current deployment in memory.

View file

@ -42,7 +42,7 @@ func TestPendingOperationsDeployment(t *testing.T) {
},
})
_, err := NewDeployment(&plugin.Context{}, &Target{}, snap, &fixedSource{}, nil, false, nil)
_, err := NewDeployment(&plugin.Context{}, &Target{}, snap, nil, &fixedSource{}, nil, false, nil)
if !assert.Error(t, err) {
t.FailNow()
}

View file

@ -0,0 +1,29 @@
package deploy
import "github.com/pulumi/pulumi/sdk/v2/go/common/resource"
// A ResourcePlan represents the planned goal state and resource operations for a single resource. The operations are
// ordered.
type ResourcePlan struct {
Goal *resource.Goal
Ops []StepOp
}
// Partial returns true if the plan is partial (i.e. its inputs properties contain unknown values).
func (rp *ResourcePlan) Partial() bool {
return rp.Goal.Properties.ContainsUnknowns()
}
func (rp *ResourcePlan) completeInputs(programInputs resource.PropertyMap) resource.PropertyMap {
// Find all unknown properties and replace them with their resolved values.
plannedObject := resource.NewObjectProperty(rp.Goal.Properties.DeepCopy())
programObject := resource.NewObjectProperty(programInputs)
for _, path := range plannedObject.FindUnknowns() {
if v, ok := path.Get(programObject); ok {
path.Set(plannedObject, v)
} else {
path.Delete(plannedObject)
}
}
return plannedObject.ObjectValue()
}

View file

@ -1168,6 +1168,26 @@ func (op StepOp) Suffix() string {
return ""
}
// ConstrainedTo returns true if this operation is no more impactful than the constraint.
func (op StepOp) ConstrainedTo(constraint StepOp) bool {
var allowed []StepOp
switch constraint {
case OpSame, OpCreate, OpDelete, OpRead, OpReadReplacement, OpRefresh, OpReadDiscard, OpDiscardReplaced,
OpRemovePendingReplace, OpImport, OpImportReplacement:
allowed = []StepOp{constraint}
case OpUpdate:
allowed = []StepOp{OpSame, OpUpdate}
case OpReplace, OpCreateReplacement, OpDeleteReplaced:
allowed = []StepOp{OpSame, OpUpdate, constraint}
}
for _, candidate := range allowed {
if candidate == op {
return true
}
}
return false
}
// getProvider fetches the provider for the given step.
func getProvider(s Step) (plugin.Provider, error) {
if providers.IsProviderType(s.Type()) {

View file

@ -154,6 +154,29 @@ func (sg *stepGenerator) GenerateSteps(event RegisterResourceEvent) ([]Step, res
contract.Assert(len(steps) == 0)
return nil, res
}
// Check each proposed step against the relevant resource plan, if any, and generate any output resource plans.
for _, s := range steps {
if resourcePlan, ok := sg.plan.resourcePlans[s.URN()]; ok {
if len(resourcePlan.Ops) == 0 {
return nil, result.Errorf("unexpected %v step for resource %v", s.Op(), s.URN())
}
constraint := resourcePlan.Ops[0]
if !s.Op().ConstrainedTo(constraint) {
return nil, result.Errorf("illegal %v step for reource %v: expected a %v step", s.Op(), s.URN(), constraint)
}
resourcePlan.Ops = resourcePlan.Ops[1:]
}
resourcePlan, ok := sg.plan.newResourcePlans[s.URN()]
if !ok {
resourcePlan = &ResourcePlan{Goal: event.Goal()}
sg.plan.newResourcePlans[s.URN()] = resourcePlan
}
resourcePlan.Ops = append(resourcePlan.Ops, s.Op())
}
if !sg.isTargetedUpdate() {
return steps, nil
}
@ -247,6 +270,11 @@ func (sg *stepGenerator) generateSteps(event RegisterResourceEvent) ([]Step, res
inputs = processedInputs
}
// If there is a plan for this resource, finalize its inputs.
if resourcePlan, ok := sg.plan.resourcePlans[urn]; ok {
inputs = resourcePlan.completeInputs(inputs)
}
// Produce a new state object that we'll build up as operations are performed. Ultimately, this is what will
// get serialized into the checkpoint file.
new := resource.NewState(goal.Type, urn, goal.Custom, false, "", inputs, nil, goal.Parent, goal.Protect, false,

View file

@ -646,3 +646,71 @@ const OutputValueSig = "d0e6a833031e9bbcd3f4e8bde6ca49a4"
func IsInternalPropertyKey(key PropertyKey) bool {
return strings.HasPrefix(string(key), "__")
}
// DeepCopy creates a new copy of this property map.
func (m PropertyMap) DeepCopy() PropertyMap {
copy := PropertyMap{}
for k, v := range m {
copy[k] = v.DeepCopy()
}
return copy
}
// DeepCopy creates a new copy of this property.
func (v PropertyValue) DeepCopy() PropertyValue {
switch {
case v.IsArray():
copy := make([]PropertyValue, len(v.ArrayValue()))
for i, v := range v.ArrayValue() {
copy[i] = v.DeepCopy()
}
return NewArrayProperty(copy)
case v.IsObject():
return NewObjectProperty(v.ObjectValue().DeepCopy())
case v.IsSecret():
return MakeSecret(v.SecretValue().Element.DeepCopy())
default:
return v
}
}
// FindUnknowns returns the set of paths to unknown values nested inside this property map.
func (m PropertyMap) FindUnknowns() []PropertyPath {
var paths []PropertyPath
for k, v := range m {
if v.IsComputed() || v.IsOutput() {
paths = append(paths, PropertyPath{string(k)})
} else {
for _, p := range v.FindUnknowns() {
p = append(PropertyPath{string(k)}, p...)
paths = append(paths, p)
}
}
}
return paths
}
// FindUnknowns returns the set of paths to unknown values nested inside this property map.
func (v PropertyValue) FindUnknowns() []PropertyPath {
switch {
case v.IsArray():
var paths []PropertyPath
for i, v := range v.ArrayValue() {
if v.IsComputed() || v.IsOutput() {
paths = append(paths, PropertyPath{i})
} else {
for _, p := range v.FindUnknowns() {
p = append(PropertyPath{i}, p...)
paths = append(paths, p)
}
}
}
return paths
case v.IsObject():
return v.ObjectValue().FindUnknowns()
case v.IsSecret():
return v.SecretValue().Element.FindUnknowns()
default:
return nil
}
}