Snapshot management overhaul and refactor (#1273)

* Refactor the SnapshotManager interface

Lift snapshot management out of the engine by delegating it to the
SnapshotManager implementation in pkg/backend.

* Add a event interface for plugin loads and use that interface to record plugins in the snapshot

* Remove dead code

* Add comments to Events

* Add a number of tests for SnapshotManager

* CR feedback: use a successful bit on 'End' instead of having a separate 'Abort' API

* CR feedback

* CR feedback: register plugins one-at-a-time instead of the entire state at once
This commit is contained in:
Sean Gillespie 2018-04-25 17:20:08 -07:00 committed by GitHub
parent 1994c94b8b
commit 14baf866f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 854 additions and 249 deletions

View file

@ -46,7 +46,7 @@ func getProjectPlugins() ([]workspace.PluginInfo, error) {
}
projinfo := &engine.Projinfo{Proj: proj, Root: root}
pwd, main, ctx, err := engine.ProjectInfoContext(projinfo, nil, cmdutil.Diag(), nil)
pwd, main, ctx, err := engine.ProjectInfoContext(projinfo, nil, nil, cmdutil.Diag(), nil)
if err != nil {
return nil, err
}

View file

@ -810,7 +810,8 @@ func (b *cloudBackend) runEngineAction(
return err
}
manager := b.newSnapshotManager(u.update, u.tokenSource)
persister := b.newSnapshotPersister(u.update, u.tokenSource)
manager := backend.NewSnapshotManager(stackRef.StackName(), persister, u.GetTarget().Snapshot)
displayEvents := make(chan engine.Event)
displayDone := make(chan bool)

View file

@ -3,59 +3,42 @@
package cloud
import (
"github.com/pulumi/pulumi/pkg/backend"
"github.com/pulumi/pulumi/pkg/backend/cloud/client"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/resource/stack"
)
// cloudSnapshotManager persists snapshots to the Pulumi service.
type cloudSnapshotManager struct {
// cloudSnapshotPersister persists snapshots to the Pulumi service.
type cloudSnapshotPersister struct {
update client.UpdateIdentifier // The UpdateIdentifier for this update sequence.
tokenSource *tokenSource // A token source for interacting with the service.
backend *cloudBackend // A backend for communicating with the service
}
// BeginMutation marks the snapshot with an intent to mutate it by invalidating the existing
// saved checkpoint.
func (csm *cloudSnapshotManager) BeginMutation() (engine.SnapshotMutation, error) {
// invalidate the current checkpoint
token, err := csm.tokenSource.GetToken()
func (persister *cloudSnapshotPersister) Invalidate() error {
token, err := persister.tokenSource.GetToken()
if err != nil {
return nil, err
return err
}
if err = csm.backend.client.InvalidateUpdateCheckpoint(csm.update, token); err != nil {
return nil, err
}
return &cloudSnapshotMutation{manager: csm}, nil
return persister.backend.client.InvalidateUpdateCheckpoint(persister.update, token)
}
func (csm *cloudSnapshotManager) Close() error { return nil }
var _ engine.SnapshotManager = (*cloudSnapshotManager)(nil)
// cloudSnapshotMutation represents a single mutating operation on the checkpoint. `End` completes
// the mutation sequence by "patching" the checkpoint with the new snapshot and removing the "dirty"
// bit set by `BeginMutation`.
type cloudSnapshotMutation struct {
manager *cloudSnapshotManager
}
func (csm *cloudSnapshotMutation) End(snapshot *deploy.Snapshot) error {
// Upload the new checkpoint.
token, err := csm.manager.tokenSource.GetToken()
func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
token, err := persister.tokenSource.GetToken()
if err != nil {
return err
}
deployment := stack.SerializeDeployment(snapshot)
return csm.manager.backend.client.PatchUpdateCheckpoint(csm.manager.update, deployment, token)
return persister.backend.client.PatchUpdateCheckpoint(persister.update, deployment, token)
}
var _ engine.SnapshotMutation = (*cloudSnapshotMutation)(nil)
var _ backend.SnapshotPersister = (*cloudSnapshotPersister)(nil)
func (cb *cloudBackend) newSnapshotManager(update client.UpdateIdentifier,
tokenSource *tokenSource) *cloudSnapshotManager {
return &cloudSnapshotManager{
func (cb *cloudBackend) newSnapshotPersister(update client.UpdateIdentifier,
tokenSource *tokenSource) *cloudSnapshotPersister {
return &cloudSnapshotPersister{
update: update,
tokenSource: tokenSource,
backend: cb,

View file

@ -225,7 +225,8 @@ func (b *localBackend) performEngineOp(op string, kind backend.UpdateKind,
return err
}
manager := b.newSnapshotManager(stackName)
persister := b.newSnapshotPersister(stackName)
manager := backend.NewSnapshotManager(stackName, persister, update.GetTarget().Snapshot)
events := make(chan engine.Event)

View file

@ -5,7 +5,6 @@ package local
import (
"os"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
@ -13,40 +12,29 @@ import (
// localSnapshotManager is a simple SnapshotManager implementation that persists snapshots
// to disk on the local machine.
type localSnapshotManager struct {
type localSnapshotPersister struct {
name tokens.QName
backend *localBackend
}
// BeginMutation does nothing and returns a SnapshotMutation that, when `End`ed,
// saves a new snapshot to disk.
func (sm *localSnapshotManager) BeginMutation() (engine.SnapshotMutation, error) {
return &localSnapshotMutation{manager: sm}, nil
func (sm *localSnapshotPersister) Invalidate() error {
return nil
}
func (sm *localSnapshotManager) Close() error { return nil }
var _ engine.SnapshotManager = (*localSnapshotManager)(nil)
type localSnapshotMutation struct {
manager *localSnapshotManager
}
func (lsm *localSnapshotMutation) End(snapshot *deploy.Snapshot) error {
func (sm *localSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
stack := snapshot.Stack
contract.Assert(lsm.manager.name == stack)
contract.Assert(sm.name == stack)
config, _, _, err := lsm.manager.backend.getStack(stack)
config, _, _, err := sm.backend.getStack(stack)
if err != nil && !os.IsNotExist(err) {
return err
}
_, err = lsm.manager.backend.saveStack(stack, config, snapshot)
_, err = sm.backend.saveStack(stack, config, snapshot)
return err
}
var _ engine.SnapshotMutation = (*localSnapshotMutation)(nil)
func (b *localBackend) newSnapshotManager(stackName tokens.QName) *localSnapshotManager {
return &localSnapshotManager{name: stackName, backend: b}
func (b *localBackend) newSnapshotPersister(stackName tokens.QName) *localSnapshotPersister {
return &localSnapshotPersister{name: stackName, backend: b}
}

328
pkg/backend/snapshot.go Normal file
View file

@ -0,0 +1,328 @@
package backend
import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/engine"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/version"
"github.com/pulumi/pulumi/pkg/workspace"
)
// SnapshotPersister is an interface implemented by our backends that implements snapshot
// persistence. In order to fit into our current model, snapshot persisters have two functions:
// saving snapshots and invalidating already-persisted snapshots.
type SnapshotPersister interface {
// Invalidates the last snapshot that was persisted. This is done as the first step
// of performing a mutation on the snapshot. Returns an error if the invalidation failed.
Invalidate() error
// Persists the given snapshot. Returns an error if the persistence failed.
Save(snapshot *deploy.Snapshot) error
}
// SnapshotManager is an implementation of engine.SnapshotManager that inspects steps and performs
// mutations on the global snapshot object serially. This implementation maintains two bits of state: the "base"
// snapshot, which is completely immutable and represents the state of the world prior to the application
// of the current plan, and a "new" list of resources, which consists of the resources that were operated upon
// by the current plan.
//
// Important to note is that, although this SnapshotManager is designed to be easily convertible into a thread-safe
// implementation, the code as it is today is *not thread safe*. In particular, it is not legal for there to be
// more than one `SnapshotMutation` active at any point in time. This is because this SnapshotManager invalidates
// the last persisted snapshot in `BeginSnapshot`. This is designed to match existing behavior and will not
// be the state of things going forward.
//
// The resources stored in the `resources` slice are pointers to resource objects allocated by the engine.
// This is subtle and a little confusing. The reason for this is that the engine directly mutates resource objects
// that it creates and expects those mutations to be persisted directly to the snapshot.
type SnapshotManager struct {
persister SnapshotPersister // The persister responsible for invalidating and persisting the snapshot
baseSnapshot *deploy.Snapshot // The base snapshot for this plan
resources []*resource.State // The list of resources operated upon by this plan
dones map[*resource.State]bool // The set of resources that have been operated upon already by this plan
doVerify bool // If true, verify the snapshot before persisting it
plugins []workspace.PluginInfo // The list of plugins loaded by the plan, to be saved in the manifest
stackName tokens.QName // The name of the stack being updated
mutationRequests chan func() // The queue of mutation requests, to be retired serially by the manager
}
var _ engine.SnapshotManager = (*SnapshotManager)(nil)
func (sm *SnapshotManager) Close() error {
close(sm.mutationRequests)
return nil
}
// If you need to understand what's going on in this file, start here!
//
// mutate is the serialization point for reads and writes of the global snapshot state.
// The given function will be, at the time of its invocation, the only function allowed to
// mutate state within the SnapshotManager.
//
// Serialization is performed by pushing the mutation function onto a channel, where another
// goroutine is polling the channel and executing the mutation functions as they come.
// This function optionally verifies the integrity of the snapshot before and after mutation.
// Immediately after the mutating function is run, the snapshot's manifest is updated and,
// if there are no verification errors, the snapshot is persisted.
//
// You should never observe or mutate the global snapshot without using this function unless
// you have a very good justification.
func (sm *SnapshotManager) mutate(mutator func()) error {
responseChan := make(chan error)
sm.mutationRequests <- func() {
mutator()
snap := sm.snap()
err := sm.persister.Save(snap)
if err == nil && sm.doVerify {
if err = snap.VerifyIntegrity(); err != nil {
err = errors.Wrapf(err, "after mutation of snapshot")
}
}
responseChan <- err
}
return <-responseChan
}
// RegisterResourceOutputs handles the registering of outputs on a Step that has already
// completed. This is accomplished by doing an in-place mutation of the resources currently
// resident in the snapshot.
//
// Due to the way this is currently implemented, the engine directly mutates output properties
// on the resource State object that it created. Since we are storing pointers to these objects
// in the `resources` slice, we need only to do a no-op mutation in order to flush these new
// mutations to disk.
//
// Note that this is completely not thread-safe and defeats the purpose of having a `mutate` callback
// entirely, but the hope is that this state of things will not be permament.
func (sm *SnapshotManager) RegisterResourceOutputs(step deploy.Step) error {
return sm.refresh()
}
// RecordPlugin records that the current plan loaded a plugin and saves it in the snapshot.
func (sm *SnapshotManager) RecordPlugin(plugin workspace.PluginInfo) error {
return sm.mutate(func() {
sm.plugins = append(sm.plugins, plugin)
})
}
// BeginMutation signals to the SnapshotManager that the engine intends to mutate the global snapshot
// by performing the given Step. This function gives the SnapshotManager a chance to record the
// intent to mutate before the mutation occurs.
func (sm *SnapshotManager) BeginMutation(step deploy.Step) (engine.SnapshotMutation, error) {
contract.Require(step != nil, "step != nil")
glog.V(9).Infof("Beginning mutation for step `%s` on resource `%s`", step.Op(), step.URN())
// This is for compat with the existing update model with the service. Invalidating a
// stack sets a bit in a database indicating that the stored snapshot is not valid.
if err := sm.persister.Invalidate(); err != nil {
glog.V(9).Infof("Failed to invalidate snapshot: %s", err.Error())
return nil, err
}
switch step.Op() {
case deploy.OpSame:
return &sameSnapshotMutation{sm}, nil
case deploy.OpCreate, deploy.OpCreateReplacement:
return &createSnapshotMutation{sm}, nil
case deploy.OpUpdate:
return &updateSnapshotMutation{sm}, nil
case deploy.OpDelete, deploy.OpDeleteReplaced:
return &deleteSnapshotMutation{sm}, nil
case deploy.OpReplace:
return &replaceSnapshotMutation{}, nil
}
contract.Failf("unknown StepOp: %s", step.Op())
return nil, nil
}
// All SnapshotMutation implementations in this file follow the same basic formula:
// mark the "old" state as done and mark the "new" state as new. The two special
// cases are Create (where the "old" state does not exist) and Delete (where the "new" state
// does not exist).
//
// Marking a resource state as old prevents it from being persisted to the snapshot in
// the `snap` function. Marking a resource state as new /enables/ it to be persisted to
// the snapshot in `snap`. See the comments in `snap` for more details.
type sameSnapshotMutation struct {
manager *SnapshotManager
}
func (ssm *sameSnapshotMutation) End(step deploy.Step, successful bool) error {
contract.Require(step != nil, "step != nil")
return ssm.manager.mutate(func() {
if successful {
ssm.manager.markDone(step.Old())
ssm.manager.markNew(step.New())
}
})
}
type createSnapshotMutation struct {
manager *SnapshotManager
}
func (csm *createSnapshotMutation) End(step deploy.Step, successful bool) error {
contract.Require(step != nil, "step != nil")
return csm.manager.mutate(func() {
if successful {
// There is some very subtle behind-the-scenes magic here that
// comes into play whenever this create is a CreateReplacement.
//
// Despite intending for the base snapshot to be immutable, the engine
// does in fact mutate it by setting a `Delete` flag on resources
// being replaced as part of a Create-Before-Delete replacement sequence.
// Since we are storing the base snapshot and all resources by reference
// (we have pointers to engine-allocated objects), this transparently
// "just works" for the SnapshotManager.
csm.manager.markNew(step.New())
}
})
}
type updateSnapshotMutation struct {
manager *SnapshotManager
}
func (usm *updateSnapshotMutation) End(step deploy.Step, successful bool) error {
contract.Require(step != nil, "step != nil")
return usm.manager.mutate(func() {
if successful {
usm.manager.markDone(step.Old())
usm.manager.markNew(step.New())
}
})
}
type deleteSnapshotMutation struct {
manager *SnapshotManager
}
func (dsm *deleteSnapshotMutation) End(step deploy.Step, successful bool) error {
contract.Require(step != nil, "step != nil")
return dsm.manager.mutate(func() {
if successful {
contract.Assert(!step.Old().Protect)
dsm.manager.markDone(step.Old())
}
})
}
type replaceSnapshotMutation struct{}
func (rsm *replaceSnapshotMutation) End(step deploy.Step, successful bool) error { return nil }
// refresh does a no-op mutation that forces the SnapshotManager to persist the
// snapshot exactly as it is currently to disk. This is useful when a mutation
// has failed and we do not intend to persist the failed mutation.
func (sm *SnapshotManager) refresh() error {
return sm.mutate(func() {})
}
// markDone marks a resource as having been processed. Resources that have been marked
// in this manner won't be persisted in the snapshot.
func (sm *SnapshotManager) markDone(state *resource.State) {
contract.Assert(state != nil)
sm.dones[state] = true
glog.V(9).Infof("Marked old state snapshot as done: %v", state.URN)
}
// markNew marks a resource as existing in the new snapshot. This occurs on
// successful non-deletion operations where the given state is the new state
// of a resource that will be persisted to the snapshot.
func (sm *SnapshotManager) markNew(state *resource.State) {
contract.Assert(state != nil)
sm.resources = append(sm.resources, state)
glog.V(9).Infof("Appended new state snapshot to be written: %v", state.URN)
}
// snap produces a new Snapshot given the base snapshot and a list of resources that the current
// plan has created.
func (sm *SnapshotManager) snap() *deploy.Snapshot {
// At this point we have two resource DAGs. One of these is the base DAG for this plan; the other is the current DAG
// for this plan. Any resource r may be present in both DAGs. In order to produce a snapshot, we need to merge these
// DAGs such that all resource dependencies are correctly preserved. Conceptually, the merge proceeds as follows:
//
// - Begin with an empty merged DAG.
// - For each resource r in the current DAG, insert r and its outgoing edges into the merged DAG.
// - For each resource r in the base DAG:
// - If r is in the merged DAG, we are done: if the resource is in the merged DAG, it must have been in the
// current DAG, which accurately captures its current dependencies.
// - If r is not in the merged DAG, insert it and its outgoing edges into the merged DAG.
//
// Physically, however, each DAG is represented as list of resources without explicit dependency edges. In place of
// edges, it is assumed that the list represents a valid topological sort of its source DAG. Thus, any resource r at
// index i in a list L must be assumed to be dependent on all resources in L with index j s.t. j < i. Due to this
// representation, we implement the algorithm above as follows to produce a merged list that represents a valid
// topological sort of the merged DAG:
//
// - Begin with an empty merged list.
// - For each resource r in the current list, append r to the merged list. r must be in a correct location in the
// merged list, as its position relative to its assumed dependencies has not changed.
// - For each resource r in the base list:
// - If r is in the merged list, we are done by the logic given in the original algorithm.
// - If r is not in the merged list, append r to the merged list. r must be in a correct location in the merged
// list:
// - If any of r's dependencies were in the current list, they must already be in the merged list and their
// relative order w.r.t. r has not changed.
// - If any of r's dependencies were not in the current list, they must already be in the merged list, as
// they would have been appended to the list before r.
// Start with a copy of the resources produced during the evaluation of the current plan.
resources := make([]*resource.State, len(sm.resources))
copy(resources, sm.resources)
// Append any resources from the base plan that were not produced by the current plan.
if base := sm.baseSnapshot; base != nil {
for _, res := range base.Resources {
if !sm.dones[res] {
resources = append(resources, res)
}
}
}
manifest := deploy.Manifest{
Time: time.Now(),
Version: version.Version,
Plugins: sm.plugins,
}
manifest.Magic = manifest.NewMagic()
return deploy.NewSnapshot(sm.stackName, manifest, resources)
}
// NewSnapshotManager creates a new SnapshotManager for the given stack name, using the given persister
// and base snapshot.
//
// It is *very important* that the baseSnap pointer refers to the same Snapshot
// given to the engine! The engine will mutate this object and correctness of the
// SnapshotManager depends on being able to observe this mutation. (This is not ideal...)
func NewSnapshotManager(stackName tokens.QName, persister SnapshotPersister,
baseSnap *deploy.Snapshot) *SnapshotManager {
manager := &SnapshotManager{
persister: persister,
baseSnapshot: baseSnap,
stackName: stackName,
dones: make(map[*resource.State]bool),
doVerify: true,
mutationRequests: make(chan func()),
}
go func() {
for request := range manager.mutationRequests {
request()
}
}()
return manager
}

View file

@ -0,0 +1,369 @@
// Copyright 2016-2018, Pulumi Corporation. All rights reserved.
package backend
import (
"testing"
"time"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/version"
"github.com/stretchr/testify/assert"
)
type MockRegisterResourceEvent struct {
deploy.SourceEvent
}
func (m MockRegisterResourceEvent) Goal() *resource.Goal { return nil }
func (m MockRegisterResourceEvent) Done(result *deploy.RegisterResult) {}
type MockStackPersister struct {
Valid bool
SavedSnapshots []*deploy.Snapshot
}
func (m *MockStackPersister) Save(snap *deploy.Snapshot) error {
m.Valid = true
m.SavedSnapshots = append(m.SavedSnapshots, snap)
return nil
}
func (m *MockStackPersister) Invalidate() error {
m.Valid = false
return nil
}
func MockSetup(t *testing.T, name tokens.QName, baseSnap *deploy.Snapshot) (*SnapshotManager, *MockStackPersister) {
err := baseSnap.VerifyIntegrity()
if !assert.NoError(t, err) {
t.FailNow()
}
sp := &MockStackPersister{}
return NewSnapshotManager(name, sp, baseSnap), sp
}
func NewResource(name string, deps ...resource.URN) *resource.State {
return &resource.State{
Type: tokens.Type("test"),
URN: resource.URN(name),
Inputs: make(resource.PropertyMap),
Outputs: make(resource.PropertyMap),
Dependencies: deps,
}
}
func NewSnapshot(name tokens.QName, resources []*resource.State) *deploy.Snapshot {
return deploy.NewSnapshot(name, deploy.Manifest{
Time: time.Now(),
Version: version.Version,
Plugins: nil,
}, resources)
}
func TestIdenticalSames(t *testing.T) {
sameState := NewResource("a-unique-urn")
name := tokens.QName("test")
snap := NewSnapshot(name, []*resource.State{
sameState,
})
manager, sp := MockSetup(t, "test", snap)
// The engine generates a SameStep on sameState.
engineGeneratedSame := NewResource(string(sameState.URN))
same := deploy.NewSameStep(nil, nil, sameState, engineGeneratedSame)
mutation, err := manager.BeginMutation(same)
assert.NoError(t, err)
// The snapshot manager invalidated the stored snapshot
assert.False(t, sp.Valid)
// No mutation was made
assert.Empty(t, sp.SavedSnapshots)
err = mutation.End(same, true)
assert.NoError(t, err)
assert.True(t, sp.Valid)
// Sames `do` cause a snapshot mutation as part of `End`.
assert.NotEmpty(t, sp.SavedSnapshots)
assert.NotEmpty(t, sp.SavedSnapshots[0].Resources)
// Our same resource should be the first entry in the snapshot list.
inSnapshot := sp.SavedSnapshots[0].Resources[0]
assert.Equal(t, sameState.URN, inSnapshot.URN)
}
// This test challenges the naive approach of mutating resources
// that are the targets of Same steps in-place by changing the dependencies
// of two resources in the snapshot, which is perfectly legal in our system
// (and in fact is done by the `dependency_steps` integration test as well).
//
// The correctness of the `snap` function in snapshot.go is tested here.
func TestSamesWithDependencyChanges(t *testing.T) {
resourceA := NewResource("a-unique-urn-resource-a")
resourceB := NewResource("a-unique-urn-resource-b", resourceA.URN)
name := tokens.QName("test")
// The setup: the snapshot contains two resources, A and B, where
// B depends on A. We're going to begin a mutation in which B no longer
// depends on A and appears first in program order.
snap := NewSnapshot(name, []*resource.State{
resourceA,
resourceB,
})
manager, sp := MockSetup(t, "test", snap)
resourceBUpdated := NewResource(string(resourceB.URN))
// note: no dependencies
resourceAUpdated := NewResource(string(resourceA.URN), resourceBUpdated.URN)
// note: now depends on B
// The engine first generates a Same for b:
bSame := deploy.NewSameStep(nil, nil, resourceB, resourceBUpdated)
mutation, err := manager.BeginMutation(bSame)
assert.NoError(t, err)
assert.False(t, sp.Valid)
err = mutation.End(bSame, true)
assert.NoError(t, err)
assert.True(t, sp.Valid)
// The snapshot should now look like this:
// snapshot
// resources
// b
// a
// where b does not depend on anything and neither does a.
firstSnap := sp.SavedSnapshots[0]
assert.Len(t, firstSnap.Resources, 2)
assert.Equal(t, resourceB.URN, firstSnap.Resources[0].URN)
assert.Len(t, firstSnap.Resources[0].Dependencies, 0)
assert.Equal(t, resourceA.URN, firstSnap.Resources[1].URN)
assert.Len(t, firstSnap.Resources[1].Dependencies, 0)
// The engine then generates a Same for a:
aSame := deploy.NewSameStep(nil, nil, resourceA, resourceAUpdated)
mutation, err = manager.BeginMutation(aSame)
assert.NoError(t, err)
assert.False(t, sp.Valid)
err = mutation.End(aSame, true)
assert.NoError(t, err)
assert.True(t, sp.Valid)
// The snapshot should now look like this:
// snapshot
// resources
// b
// a
// where b does not depend on anything and a depends on b.
secondSnap := sp.SavedSnapshots[1]
assert.Len(t, secondSnap.Resources, 2)
assert.Equal(t, resourceB.URN, secondSnap.Resources[0].URN)
assert.Len(t, secondSnap.Resources[0].Dependencies, 0)
assert.Equal(t, resourceA.URN, secondSnap.Resources[1].URN)
assert.Len(t, secondSnap.Resources[1].Dependencies, 1)
assert.Equal(t, resourceB.URN, secondSnap.Resources[1].Dependencies[0])
}
// This test exercises the merge operation with a particularly vexing deployment
// state that was useful in shaking out bugs.
func TestVexingDeployment(t *testing.T) {
// This is the dependency graph we are going for in the base snapshot:
//
// +-+
// +--> |A|
// | +-+
// | ^
// | +-+
// | |B|
// | +-+
// | ^
// | +-+
// +--+ |C| <---+
// +-+ |
// ^ |
// +-+ |
// |D| |
// +-+ |
// |
// +-+ |
// |E| +---+
// +-+
a := NewResource("a")
b := NewResource("b", a.URN)
c := NewResource("c", a.URN, b.URN)
d := NewResource("d", c.URN)
e := NewResource("e", c.URN)
name := tokens.QName("test")
snap := NewSnapshot(name, []*resource.State{
a,
b,
c,
d,
e,
})
manager, sp := MockSetup(t, "test", snap)
// This is the sequence of events that come out of the engine:
// B - Same, depends on nothing
// C - CreateReplacement, depends on B
// C - Replace
// D - Update, depends on new C
// This produces the following dependency graph in the new snapshot:
// +-+
// +---> |B|
// | +++
// | ^
// | +++
// | |C| <----+
// | +-+ |
// | |
// | +-+ |
// +---+ |C| +-------------> A (not in graph!)
// +-+ |
// |
// +-+ |
// |D| +---+
// +-+
//
// Conceptually, this is a plan that deletes A. However, we have not yet observed the
// deletion of A, presumably because the engine can't know for sure that it's been deleted
// until the eval source completes. Of note in this snapshot is that the replaced C is still in the graph,
// because it has not yet been deleted, and its dependency A is not in the graph because it
// has not been seen.
//
// Since axiomatically we assume that steps come in in a valid topological order of the dependency graph,
// we can logically assume that A is going to be deleted. (If A were not being deleted, it must have been
// the target of a Step that came before C, which depends on it.)
applyStep := func(step deploy.Step) {
mutation, err := manager.BeginMutation(step)
if !assert.NoError(t, err) {
t.FailNow()
}
err = mutation.End(step, true)
if !assert.NoError(t, err) {
t.FailNow()
}
}
// b now depends on nothing
bPrime := NewResource(string(b.URN))
applyStep(deploy.NewSameStep(nil, MockRegisterResourceEvent{}, b, bPrime))
// c now only depends on b
cPrime := NewResource(string(c.URN), bPrime.URN)
// mocking out the behavior of a provider indicating that this resource needs to be deleted
createReplacement := deploy.NewCreateReplacementStep(nil, MockRegisterResourceEvent{}, c, cPrime, nil, true)
replace := deploy.NewReplaceStep(nil, c, cPrime, nil, true)
c.Delete = true
applyStep(createReplacement)
applyStep(replace)
// cPrime now exists, c is now pending deletion
// dPrime now depends on cPrime, which got replaced
dPrime := NewResource(string(d.URN), cPrime.URN)
applyStep(deploy.NewUpdateStep(nil, MockRegisterResourceEvent{}, d, dPrime, nil))
lastSnap := sp.SavedSnapshots[len(sp.SavedSnapshots)-1]
assert.Len(t, lastSnap.Resources, 6)
res := lastSnap.Resources
// Here's what the merged snapshot should look like:
// B should be first, and it should depend on nothing
assert.Equal(t, b.URN, res[0].URN)
assert.Len(t, res[0].Dependencies, 0)
// cPrime should be next, and it should depend on B
assert.Equal(t, c.URN, res[1].URN)
assert.Len(t, res[1].Dependencies, 1)
assert.Equal(t, b.URN, res[1].Dependencies[0])
// d should be next, and it should depend on cPrime
assert.Equal(t, d.URN, res[2].URN)
assert.Len(t, res[2].Dependencies, 1)
assert.Equal(t, c.URN, res[2].Dependencies[0])
// a should be next, and it should depend on nothing
assert.Equal(t, a.URN, res[3].URN)
assert.Len(t, res[3].Dependencies, 0)
// c should be next, it should depend on A and B and should be pending deletion
// this is a critical operation of snap and the crux of this test:
// merge MUST put c after a in the snapshot, despite never having seen a in the current plan
assert.Equal(t, c.URN, res[4].URN)
assert.True(t, res[4].Delete)
assert.Len(t, res[4].Dependencies, 2)
assert.Contains(t, res[4].Dependencies, a.URN)
assert.Contains(t, res[4].Dependencies, b.URN)
// e should be last, it should depend on C and still be live
assert.Equal(t, e.URN, res[5].URN)
assert.Len(t, res[5].Dependencies, 1)
assert.Equal(t, c.URN, res[5].Dependencies[0])
}
func TestDeletion(t *testing.T) {
resourceA := NewResource("a")
name := tokens.QName("test")
snap := NewSnapshot(name, []*resource.State{
resourceA,
})
manager, sp := MockSetup(t, name, snap)
step := deploy.NewDeleteStep(nil, resourceA)
mutation, err := manager.BeginMutation(step)
if !assert.NoError(t, err) {
t.FailNow()
}
err = mutation.End(step, true)
if !assert.NoError(t, err) {
t.FailNow()
}
// the end mutation should mark the resource as "done".
// snap should then not put resourceA in the merged snapshot, since it has been deleted.
lastSnap := sp.SavedSnapshots[len(sp.SavedSnapshots)-1]
assert.Len(t, lastSnap.Resources, 0)
}
func TestFailedDelete(t *testing.T) {
resourceA := NewResource("a")
name := tokens.QName("test")
snap := NewSnapshot(name, []*resource.State{
resourceA,
})
manager, sp := MockSetup(t, name, snap)
step := deploy.NewDeleteStep(nil, resourceA)
mutation, err := manager.BeginMutation(step)
if !assert.NoError(t, err) {
t.FailNow()
}
err = mutation.End(step, false /* successful */)
if !assert.NoError(t, err) {
t.FailNow()
}
// since we marked the mutation as not successful, the snapshot should still contain
// the resource we failed to delete.
lastSnap := sp.SavedSnapshots[len(sp.SavedSnapshots)-1]
assert.Len(t, lastSnap.Resources, 1)
assert.Equal(t, resourceA.URN, lastSnap.Resources[0].URN)
}

View file

@ -18,8 +18,8 @@ import (
)
// ProjectInfoContext returns information about the current project, including its pwd, main, and plugin context.
func ProjectInfoContext(projinfo *Projinfo, config plugin.ConfigSource, diag diag.Sink,
tracingSpan opentracing.Span) (string, string, *plugin.Context, error) {
func ProjectInfoContext(projinfo *Projinfo, config plugin.ConfigSource, pluginEvents plugin.Events,
diag diag.Sink, tracingSpan opentracing.Span) (string, string, *plugin.Context, error) {
contract.Require(projinfo != nil, "projinfo")
// If the package contains an override for the main entrypoint, use it.
@ -29,7 +29,7 @@ func ProjectInfoContext(projinfo *Projinfo, config plugin.ConfigSource, diag dia
}
// Create a context for plugins.
ctx, err := plugin.NewContext(diag, nil, config, pwd, tracingSpan)
ctx, err := plugin.NewContext(diag, nil, config, pluginEvents, pwd, tracingSpan)
if err != nil {
return "", "", nil, err
}
@ -68,10 +68,11 @@ type planOptions struct {
// creates resources to compare against the current checkpoint state (e.g., by evaluating a program, etc).
SourceFunc planSourceFunc
SkipOutputs bool // true if we we should skip printing outputs separately.
DOT bool // true if we should print the DOT file for this plan.
Events eventEmitter // the channel to write events from the engine to.
Diag diag.Sink // the sink to use for diag'ing.
SkipOutputs bool // true if we we should skip printing outputs separately.
DOT bool // true if we should print the DOT file for this plan.
Events eventEmitter // the channel to write events from the engine to.
Diag diag.Sink // the sink to use for diag'ing.
PluginEvents plugin.Events // an optional listener for plugin events
}
// planSourceFunc is a callback that will be used to prepare for, and evaluate, the "new" state for a stack.
@ -91,7 +92,7 @@ func plan(ctx *planContext, opts planOptions, dryRun bool) (*planResult, error)
contract.Assert(proj != nil)
contract.Assert(target != nil)
projinfo := &Projinfo{Proj: proj, Root: ctx.Update.GetRoot()}
pwd, main, plugctx, err := ProjectInfoContext(projinfo, target, opts.Diag, ctx.TracingSpan)
pwd, main, plugctx, err := ProjectInfoContext(projinfo, target, opts.PluginEvents, opts.Diag, ctx.TracingSpan)
if err != nil {
return nil, err
}

View file

@ -27,6 +27,7 @@ func Refresh(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Resou
SkipOutputs: true, // refresh is exclusively about outputs
SourceFunc: newRefreshSource,
Events: emitter,
PluginEvents: &pluginActions{ctx},
Diag: newEventSink(emitter),
}, dryRun)
}

View file

@ -4,6 +4,7 @@ import (
"io"
"github.com/pulumi/pulumi/pkg/resource/deploy"
"github.com/pulumi/pulumi/pkg/workspace"
)
// SnapshotManager is responsible for maintaining the in-memory representation
@ -15,7 +16,14 @@ type SnapshotManager interface {
// snapshot. It provides the step that it intends to execute. Based on that step, BeginMutation
// will record this intent in the global snapshot and return a `SnapshotMutation` that, when ended,
// will complete the transaction.
BeginMutation() (SnapshotMutation, error)
BeginMutation(step deploy.Step) (SnapshotMutation, error)
// RegisterResourceOutputs registers the set of resource outputs generated by performing the
// given step. These outputs are persisted in the snapshot.
RegisterResourceOutputs(step deploy.Step) error
// RecordPlugin records that the current plan loaded a plugin and saves it in the snapshot.
RecordPlugin(plugin workspace.PluginInfo) error
}
// SnapshotMutation represents an outstanding mutation that is yet to be completed. When the engine completes
@ -23,5 +31,5 @@ type SnapshotManager interface {
type SnapshotMutation interface {
// End terminates the transaction and commits the results to the snapshot, returning an error if this
// failed to complete.
End(snapshot *deploy.Snapshot) error
End(step deploy.Step, successful bool) error
}

View file

@ -53,6 +53,7 @@ func Update(u UpdateInfo, ctx *Context, opts UpdateOptions, dryRun bool) (Resour
SourceFunc: newUpdateSource,
Events: emitter,
Diag: newEventSink(emitter),
PluginEvents: &pluginActions{ctx},
}, dryRun)
}
@ -135,6 +136,16 @@ func update(ctx *Context, info *planContext, opts planOptions, dryRun bool) (Res
return resourceChanges, nil
}
// pluginActions listens for plugin events and persists the set of loaded plugins
// to the snapshot.
type pluginActions struct {
Context *Context
}
func (p *pluginActions) OnPluginLoad(loadedPlug workspace.PluginInfo) error {
return p.Context.SnapshotManager.RecordPlugin(loadedPlug)
}
// updateActions pretty-prints the plan application process as it goes.
type updateActions struct {
Context *Context
@ -163,7 +174,7 @@ func (acts *updateActions) OnResourceStepPre(step deploy.Step) (interface{}, err
acts.Opts.Events.resourcePreEvent(step, false /*planning*/, acts.Opts.Debug)
// Inform the snapshot service that we are about to perform a step.
return acts.Context.SnapshotManager.BeginMutation()
return acts.Context.SnapshotManager.BeginMutation(step)
}
func (acts *updateActions) OnResourceStepPost(ctx interface{},
@ -203,8 +214,9 @@ func (acts *updateActions) OnResourceStepPost(ctx interface{},
}
// Write out the current snapshot. Note that even if a failure has occurred, we should still have a
// safe checkpoint. Note that any error that occurs when writing the checkpoint trumps the error reported above.
return ctx.(SnapshotMutation).End(step.Iterator().Snap())
// safe checkpoint. Note that any error that occurs when writing the checkpoint trumps the error
// reported above.
return ctx.(SnapshotMutation).End(step, err == nil)
}
func (acts *updateActions) OnResourceOutputs(step deploy.Step) error {
@ -214,10 +226,5 @@ func (acts *updateActions) OnResourceOutputs(step deploy.Step) error {
// There's a chance there are new outputs that weren't written out last time.
// We need to perform another snapshot write to ensure they get written out.
mutation, err := acts.Context.SnapshotManager.BeginMutation()
if err != nil {
return err
}
return mutation.End(step.Iterator().Snap())
return acts.Context.SnapshotManager.RegisterResourceOutputs(step)
}

View file

@ -4,7 +4,6 @@ package deploy
import (
"reflect"
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
@ -14,8 +13,6 @@ import (
"github.com/pulumi/pulumi/pkg/resource/plugin"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/version"
"github.com/pulumi/pulumi/pkg/workspace"
)
// Options controls the planning and deployment process.
@ -64,7 +61,6 @@ type PlanSummary interface {
Deletes() map[resource.URN]bool
Sames() map[resource.URN]bool
Resources() []*resource.State
Snap() *Snapshot
}
// PlanIterator can be used to step through and/or execute a plan's proposed actions.
@ -415,15 +411,15 @@ func (iter *PlanIterator) makeRegisterResourceSteps(e RegisterResourceEvent) ([]
if diff.DeleteBeforeReplace {
return []Step{
NewDeleteReplacementStep(iter, old, false),
NewReplaceStep(iter, old, new, diff.ReplaceKeys, false),
NewCreateReplacementStep(iter, e, old, new, diff.ReplaceKeys, false),
NewDeleteReplacementStep(iter.p, old, false),
NewReplaceStep(iter.p, old, new, diff.ReplaceKeys, false),
NewCreateReplacementStep(iter.p, e, old, new, diff.ReplaceKeys, false),
}, nil
}
return []Step{
NewCreateReplacementStep(iter, e, old, new, diff.ReplaceKeys, true),
NewReplaceStep(iter, old, new, diff.ReplaceKeys, true),
NewCreateReplacementStep(iter.p, e, old, new, diff.ReplaceKeys, true),
NewReplaceStep(iter.p, old, new, diff.ReplaceKeys, true),
// note that the delete step is generated "later" on, after all creates/updates finish.
}, nil
}
@ -433,7 +429,7 @@ func (iter *PlanIterator) makeRegisterResourceSteps(e RegisterResourceEvent) ([]
if glog.V(7) {
glog.V(7).Infof("Planner decided to update '%v' (oldprops=%v inputs=%v", urn, oldInputs, new.Inputs)
}
return []Step{NewUpdateStep(iter, e, old, new, diff.StableKeys)}, nil
return []Step{NewUpdateStep(iter.p, e, old, new, diff.StableKeys)}, nil
}
// No need to update anything, the properties didn't change.
@ -441,13 +437,13 @@ func (iter *PlanIterator) makeRegisterResourceSteps(e RegisterResourceEvent) ([]
if glog.V(7) {
glog.V(7).Infof("Planner decided not to update '%v' (same) (inputs=%v)", urn, new.Inputs)
}
return []Step{NewSameStep(iter, e, old, new)}, nil
return []Step{NewSameStep(iter.p, e, old, new)}, nil
}
// Otherwise, the resource isn't in the old map, so it must be a resource creation.
iter.creates[urn] = true
glog.V(7).Infof("Planner decided to create '%v' (inputs=%v)", urn, new.Inputs)
return []Step{NewCreateStep(iter, e, new)}, nil
return []Step{NewCreateStep(iter.p, e, new)}, nil
}
// getResourcePropertyStates returns the properties, inputs, outputs, and new resource state, given a goal state.
@ -532,11 +528,11 @@ func (iter *PlanIterator) computeDeletes() []Step {
if res.Delete {
glog.V(7).Infof("Planner decided to delete '%v' due to replacement", res.URN)
iter.deletes[res.URN] = true
dels = append(dels, NewDeleteReplacementStep(iter, res, true))
dels = append(dels, NewDeleteReplacementStep(iter.p, res, true))
} else if !iter.sames[res.URN] && !iter.updates[res.URN] && !iter.replaces[res.URN] {
glog.V(7).Infof("Planner decided to delete '%v'", res.URN)
iter.deletes[res.URN] = true
dels = append(dels, NewDeleteStep(iter, res))
dels = append(dels, NewDeleteStep(iter.p, res))
}
}
}
@ -553,89 +549,6 @@ func (iter *PlanIterator) nextDeleteStep() Step {
return nil
}
// Snap returns a fresh snapshot that takes into account everything that has happened up till this point. Namely, if a
// failure happens partway through, the untouched snapshot elements will be retained, while any updates will be
// preserved. If no failure happens, the snapshot naturally reflects the final state of all resources.
func (iter *PlanIterator) Snap() *Snapshot {
// At this point we have two resource DAGs. One of these is the base DAG for this plan; the other is the current DAG
// for this plan. Any resource r may be present in both DAGs. In order to produce a snapshot, we need to merge these
// DAGs such that all resource dependencies are correctly preserved. Conceptually, the merge proceeds as follows:
//
// - Begin with an empty merged DAG.
// - For each resource r in the current DAG, insert r and its outgoing edges into the merged DAG.
// - For each resource r in the base DAG:
// - If r is in the merged DAG, we are done: if the resource is in the merged DAG, it must have been in the
// current DAG, which accurately captures its current dependencies.
// - If r is not in the merged DAG, insert it and its outgoing edges into the merged DAG.
//
// Physically, however, each DAG is represented as list of resources without explicit dependency edges. In place of
// edges, it is assumed that the list represents a valid topological sort of its source DAG. Thus, any resource r at
// index i in a list L must be assumed to be dependent on all resources in L with index j s.t. j < i. Due to this
// representation, we implement the algorithm above as follows to produce a merged list that represents a valid
// topological sort of the merged DAG:
//
// - Begin with an empty merged list.
// - For each resource r in the current list, append r to the merged list. r must be in a correct location in the
// merged list, as its position relative to its assumed dependencies has not changed.
// - For each resource r in the base list:
// - If r is in the merged list, we are done by the logic given in the original algorithm.
// - If r is not in the merged list, append r to the merged list. r must be in a correct location in the merged
// list:
// - If any of r's dependencies were in the current list, they must already be in the merged list and their
// relative order w.r.t. r has not changed.
// - If any of r's dependencies were not in the current list, they must already be in the merged list, as
// they would have been appended to the list before r.
// Start with a copy of the resources produced during the evaluation of the current plan.
resources := make([]*resource.State, len(iter.resources))
copy(resources, iter.resources)
// If the plan has not finished executing, append any resources from the base plan that were not produced by the
// current plan.
if !iter.done {
if prev := iter.p.prev; prev != nil {
for _, res := range prev.Resources {
if !iter.dones[res] {
resources = append(resources, res)
}
}
}
}
// Now produce a manifest and snapshot.
v, plugs := iter.SnapVersions()
manifest := Manifest{
Time: time.Now(),
Version: v,
Plugins: plugs,
}
manifest.Magic = manifest.NewMagic()
return NewSnapshot(iter.p.Target().Name, manifest, resources)
}
// SnapVersions returns all versions used in the generation of this snapshot. Note that no attempt is made to
// "merge" with old version information. So, if a checkpoint doesn't end up loading all of the possible plugins
// it could ever load -- e.g., due to a failure -- there will be some resources in the checkpoint snapshot that
// were loaded by plugins that never got loaded this time around. In other words, this list is not stable.
func (iter *PlanIterator) SnapVersions() (string, []workspace.PluginInfo) {
return version.Version, iter.p.ctx.Host.ListPlugins()
}
// MarkStateSnapshot marks an old state snapshot as being processed. This is done to recover from failures partway
// through the application of a deployment plan. Any old state that has not yet been recovered needs to be kept.
func (iter *PlanIterator) MarkStateSnapshot(state *resource.State) {
contract.Assert(state != nil)
iter.dones[state] = true
glog.V(9).Infof("Marked old state snapshot as done: %v", state.URN)
}
// AppendStateSnapshot appends a resource's state to the current snapshot.
func (iter *PlanIterator) AppendStateSnapshot(state *resource.State) {
contract.Assert(state != nil)
iter.resources = append(iter.resources, state)
glog.V(9).Infof("Appended new state snapshot to be written: %v", state.URN)
}
// Provider fetches the provider for a given resource type, possibly lazily allocating the plugins for it. If a
// provider could not be found, or an error occurred while creating it, a non-nil error is returned.
func (iter *PlanIterator) Provider(t tokens.Type) (plugin.Provider, error) {

View file

@ -23,7 +23,7 @@ import (
func TestNullPlan(t *testing.T) {
t.Parallel()
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, "", nil)
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, nil, "", nil)
assert.Nil(t, err)
targ := &Target{Name: tokens.QName("null")}
prev := NewSnapshot(targ.Name, Manifest{}, nil)
@ -44,7 +44,7 @@ func TestErrorPlan(t *testing.T) {
// First trigger an error from Iterate:
{
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, "", nil)
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, nil, "", nil)
assert.Nil(t, err)
targ := &Target{Name: tokens.QName("errs")}
prev := NewSnapshot(targ.Name, Manifest{}, nil)
@ -59,7 +59,7 @@ func TestErrorPlan(t *testing.T) {
// Next trigger an error from Next:
{
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, "", nil)
ctx, err := plugin.NewContext(cmdutil.Diag(), nil, nil, nil, "", nil)
assert.Nil(t, err)
targ := &Target{Name: tokens.QName("errs")}
prev := NewSnapshot(targ.Name, Manifest{}, nil)
@ -129,7 +129,7 @@ func TestBasicCRUDPlan(t *testing.T) {
// we don't actually execute the plan, so there's no need to implement the other functions.
}, nil
},
}, nil, "", nil)
}, nil, nil, "", nil)
assert.Nil(t, err)
// Setup a fake namespace/target combination.

View file

@ -23,14 +23,12 @@ type Step interface {
New() *resource.State // the state of the resource after performing this step.
Res() *resource.State // the latest state for the resource that is known (worst case, old).
Logical() bool // true if this step represents a logical operation in the program.
Plan() *Plan // the owning plan.
Iterator() *PlanIterator // the current plan iterator.
Plan() *Plan // the owning plan.
}
// SameStep is a mutating step that does nothing.
type SameStep struct {
iter *PlanIterator // the current plan iteration.
plan *Plan // the current plan.
reg RegisterResourceEvent // the registration intent to convey a URN back to.
old *resource.State // the state of the resource before this step.
new *resource.State // the state of the resource after this step.
@ -38,7 +36,7 @@ type SameStep struct {
var _ Step = (*SameStep)(nil)
func NewSameStep(iter *PlanIterator, reg RegisterResourceEvent, old *resource.State, new *resource.State) Step {
func NewSameStep(plan *Plan, reg RegisterResourceEvent, old *resource.State, new *resource.State) Step {
contract.Assert(old != nil)
contract.Assert(old.URN != "")
contract.Assert(old.ID != "" || !old.Custom)
@ -48,41 +46,34 @@ func NewSameStep(iter *PlanIterator, reg RegisterResourceEvent, old *resource.St
contract.Assert(new.ID == "")
contract.Assert(!new.Delete)
return &SameStep{
iter: iter,
plan: plan,
reg: reg,
old: old,
new: new,
}
}
func (s *SameStep) Op() StepOp { return OpSame }
func (s *SameStep) Plan() *Plan { return s.iter.p }
func (s *SameStep) Iterator() *PlanIterator { return s.iter }
func (s *SameStep) Type() tokens.Type { return s.old.Type }
func (s *SameStep) URN() resource.URN { return s.old.URN }
func (s *SameStep) Old() *resource.State { return s.old }
func (s *SameStep) New() *resource.State { return s.new }
func (s *SameStep) Res() *resource.State { return s.new }
func (s *SameStep) Logical() bool { return true }
func (s *SameStep) Op() StepOp { return OpSame }
func (s *SameStep) Plan() *Plan { return s.plan }
func (s *SameStep) Type() tokens.Type { return s.old.Type }
func (s *SameStep) URN() resource.URN { return s.old.URN }
func (s *SameStep) Old() *resource.State { return s.old }
func (s *SameStep) New() *resource.State { return s.new }
func (s *SameStep) Res() *resource.State { return s.new }
func (s *SameStep) Logical() bool { return true }
func (s *SameStep) Apply(preview bool) (resource.Status, error) {
// Retain the URN, ID, and outputs:
s.new.URN = s.old.URN
s.new.ID = s.old.ID
s.new.Outputs = s.old.Outputs
if !preview {
s.iter.MarkStateSnapshot(s.old)
s.iter.AppendStateSnapshot(s.new)
}
s.reg.Done(&RegisterResult{State: s.new, Stable: true})
return resource.StatusOK, nil
}
// CreateStep is a mutating step that creates an entirely new resource.
type CreateStep struct {
iter *PlanIterator // the current plan iteration.
plan *Plan // the current plan.
reg RegisterResourceEvent // the registration intent to convey a URN back to.
old *resource.State // the state of the existing resource (only for replacements).
new *resource.State // the state of the resource after this step.
@ -93,20 +84,20 @@ type CreateStep struct {
var _ Step = (*CreateStep)(nil)
func NewCreateStep(iter *PlanIterator, reg RegisterResourceEvent, new *resource.State) Step {
func NewCreateStep(plan *Plan, reg RegisterResourceEvent, new *resource.State) Step {
contract.Assert(reg != nil)
contract.Assert(new != nil)
contract.Assert(new.URN != "")
contract.Assert(new.ID == "")
contract.Assert(!new.Delete)
return &CreateStep{
iter: iter,
plan: plan,
reg: reg,
new: new,
}
}
func NewCreateReplacementStep(iter *PlanIterator, reg RegisterResourceEvent,
func NewCreateReplacementStep(plan *Plan, reg RegisterResourceEvent,
old *resource.State, new *resource.State, keys []resource.PropertyKey, pendingDelete bool) Step {
contract.Assert(reg != nil)
contract.Assert(old != nil)
@ -119,7 +110,7 @@ func NewCreateReplacementStep(iter *PlanIterator, reg RegisterResourceEvent,
contract.Assert(!new.Delete)
contract.Assert(old.Type == new.Type)
return &CreateStep{
iter: iter,
plan: plan,
reg: reg,
old: old,
new: new,
@ -135,8 +126,7 @@ func (s *CreateStep) Op() StepOp {
}
return OpCreate
}
func (s *CreateStep) Plan() *Plan { return s.iter.p }
func (s *CreateStep) Iterator() *PlanIterator { return s.iter }
func (s *CreateStep) Plan() *Plan { return s.plan }
func (s *CreateStep) Type() tokens.Type { return s.new.Type }
func (s *CreateStep) URN() resource.URN { return s.new.URN }
func (s *CreateStep) Old() *resource.State { return s.old }
@ -147,7 +137,7 @@ func (s *CreateStep) Logical() bool { return !s.replacing }
func (s *CreateStep) Apply(preview bool) (resource.Status, error) {
if !preview {
if s.new.Custom && !s.iter.p.IsRefresh() {
if s.new.Custom && !s.plan.IsRefresh() {
// Invoke the Create RPC function for this provider:
prov, err := getProvider(s)
if err != nil {
@ -163,8 +153,6 @@ func (s *CreateStep) Apply(preview bool) (resource.Status, error) {
s.new.ID = id
s.new.Outputs = outs
}
s.iter.AppendStateSnapshot(s.new)
}
// Mark the old resource as pending deletion if necessary.
@ -178,31 +166,31 @@ func (s *CreateStep) Apply(preview bool) (resource.Status, error) {
// DeleteStep is a mutating step that deletes an existing resource.
type DeleteStep struct {
iter *PlanIterator // the current plan iteration.
plan *Plan // the current plan.
old *resource.State // the state of the existing resource.
replacing bool // true if part of a replacement.
}
var _ Step = (*DeleteStep)(nil)
func NewDeleteStep(iter *PlanIterator, old *resource.State) Step {
func NewDeleteStep(plan *Plan, old *resource.State) Step {
contract.Assert(old != nil)
contract.Assert(old.URN != "")
contract.Assert(old.ID != "" || !old.Custom)
contract.Assert(!old.Delete)
return &DeleteStep{
iter: iter,
plan: plan,
old: old,
}
}
func NewDeleteReplacementStep(iter *PlanIterator, old *resource.State, pendingDelete bool) Step {
func NewDeleteReplacementStep(plan *Plan, old *resource.State, pendingDelete bool) Step {
contract.Assert(old != nil)
contract.Assert(old.URN != "")
contract.Assert(old.ID != "" || !old.Custom)
contract.Assert(!pendingDelete || old.Delete)
return &DeleteStep{
iter: iter,
plan: plan,
old: old,
replacing: true,
}
@ -214,14 +202,13 @@ func (s *DeleteStep) Op() StepOp {
}
return OpDelete
}
func (s *DeleteStep) Plan() *Plan { return s.iter.p }
func (s *DeleteStep) Iterator() *PlanIterator { return s.iter }
func (s *DeleteStep) Type() tokens.Type { return s.old.Type }
func (s *DeleteStep) URN() resource.URN { return s.old.URN }
func (s *DeleteStep) Old() *resource.State { return s.old }
func (s *DeleteStep) New() *resource.State { return nil }
func (s *DeleteStep) Res() *resource.State { return s.old }
func (s *DeleteStep) Logical() bool { return !s.replacing }
func (s *DeleteStep) Plan() *Plan { return s.plan }
func (s *DeleteStep) Type() tokens.Type { return s.old.Type }
func (s *DeleteStep) URN() resource.URN { return s.old.URN }
func (s *DeleteStep) Old() *resource.State { return s.old }
func (s *DeleteStep) New() *resource.State { return nil }
func (s *DeleteStep) Res() *resource.State { return s.old }
func (s *DeleteStep) Logical() bool { return !s.replacing }
func (s *DeleteStep) Apply(preview bool) (resource.Status, error) {
// Refuse to delete protected resources.
@ -231,7 +218,7 @@ func (s *DeleteStep) Apply(preview bool) (resource.Status, error) {
}
if !preview {
if s.old.Custom && !s.iter.p.IsRefresh() {
if s.old.Custom && !s.plan.IsRefresh() {
// Invoke the Delete RPC function for this provider:
prov, err := getProvider(s)
if err != nil {
@ -241,8 +228,6 @@ func (s *DeleteStep) Apply(preview bool) (resource.Status, error) {
return rst, err
}
}
s.iter.MarkStateSnapshot(s.old)
}
return resource.StatusOK, nil
@ -250,7 +235,7 @@ func (s *DeleteStep) Apply(preview bool) (resource.Status, error) {
// UpdateStep is a mutating step that updates an existing resource's state.
type UpdateStep struct {
iter *PlanIterator // the current plan iteration.
plan *Plan // the current plan.
reg RegisterResourceEvent // the registration intent to convey a URN back to.
old *resource.State // the state of the existing resource.
new *resource.State // the newly computed state of the resource after updating.
@ -259,7 +244,7 @@ type UpdateStep struct {
var _ Step = (*UpdateStep)(nil)
func NewUpdateStep(iter *PlanIterator, reg RegisterResourceEvent, old *resource.State,
func NewUpdateStep(plan *Plan, reg RegisterResourceEvent, old *resource.State,
new *resource.State, stables []resource.PropertyKey) Step {
contract.Assert(old != nil)
contract.Assert(old.URN != "")
@ -271,7 +256,7 @@ func NewUpdateStep(iter *PlanIterator, reg RegisterResourceEvent, old *resource.
contract.Assert(!new.Delete)
contract.Assert(old.Type == new.Type)
return &UpdateStep{
iter: iter,
plan: plan,
reg: reg,
old: old,
new: new,
@ -279,15 +264,14 @@ func NewUpdateStep(iter *PlanIterator, reg RegisterResourceEvent, old *resource.
}
}
func (s *UpdateStep) Op() StepOp { return OpUpdate }
func (s *UpdateStep) Plan() *Plan { return s.iter.p }
func (s *UpdateStep) Iterator() *PlanIterator { return s.iter }
func (s *UpdateStep) Type() tokens.Type { return s.old.Type }
func (s *UpdateStep) URN() resource.URN { return s.old.URN }
func (s *UpdateStep) Old() *resource.State { return s.old }
func (s *UpdateStep) New() *resource.State { return s.new }
func (s *UpdateStep) Res() *resource.State { return s.new }
func (s *UpdateStep) Logical() bool { return true }
func (s *UpdateStep) Op() StepOp { return OpUpdate }
func (s *UpdateStep) Plan() *Plan { return s.plan }
func (s *UpdateStep) Type() tokens.Type { return s.old.Type }
func (s *UpdateStep) URN() resource.URN { return s.old.URN }
func (s *UpdateStep) Old() *resource.State { return s.old }
func (s *UpdateStep) New() *resource.State { return s.new }
func (s *UpdateStep) Res() *resource.State { return s.new }
func (s *UpdateStep) Logical() bool { return true }
func (s *UpdateStep) Apply(preview bool) (resource.Status, error) {
// Always propagate the URN and ID, even in previews and refreshes.
@ -295,7 +279,7 @@ func (s *UpdateStep) Apply(preview bool) (resource.Status, error) {
s.new.ID = s.old.ID
if !preview {
if s.new.Custom && !s.iter.p.IsRefresh() {
if s.new.Custom && !s.plan.IsRefresh() {
// Invoke the Update RPC function for this provider:
prov, err := getProvider(s)
if err != nil {
@ -311,10 +295,6 @@ func (s *UpdateStep) Apply(preview bool) (resource.Status, error) {
// Now copy any output state back in case the update triggered cascading updates to other properties.
s.new.Outputs = outs
}
// Mark the old state as having been processed, and add the new state.
s.iter.MarkStateSnapshot(s.old)
s.iter.AppendStateSnapshot(s.new)
}
// Finally, mark this operation as complete.
@ -326,7 +306,7 @@ func (s *UpdateStep) Apply(preview bool) (resource.Status, error) {
// a creation of the new resource, any number of intervening updates of dependents to the new resource, and then
// a deletion of the now-replaced old resource. This logical step is primarily here for tools and visualization.
type ReplaceStep struct {
iter *PlanIterator // the current plan iteration.
plan *Plan // the current plan.
old *resource.State // the state of the existing resource.
new *resource.State // the new state snapshot.
keys []resource.PropertyKey // the keys causing replacement.
@ -335,7 +315,7 @@ type ReplaceStep struct {
var _ Step = (*ReplaceStep)(nil)
func NewReplaceStep(iter *PlanIterator, old *resource.State, new *resource.State,
func NewReplaceStep(plan *Plan, old *resource.State, new *resource.State,
keys []resource.PropertyKey, pendingDelete bool) Step {
contract.Assert(old != nil)
contract.Assert(old.URN != "")
@ -346,7 +326,7 @@ func NewReplaceStep(iter *PlanIterator, old *resource.State, new *resource.State
contract.Assert(new.ID == "")
contract.Assert(!new.Delete)
return &ReplaceStep{
iter: iter,
plan: plan,
old: old,
new: new,
keys: keys,
@ -355,8 +335,7 @@ func NewReplaceStep(iter *PlanIterator, old *resource.State, new *resource.State
}
func (s *ReplaceStep) Op() StepOp { return OpReplace }
func (s *ReplaceStep) Plan() *Plan { return s.iter.p }
func (s *ReplaceStep) Iterator() *PlanIterator { return s.iter }
func (s *ReplaceStep) Plan() *Plan { return s.plan }
func (s *ReplaceStep) Type() tokens.Type { return s.old.Type }
func (s *ReplaceStep) URN() resource.URN { return s.old.URN }
func (s *ReplaceStep) Old() *resource.State { return s.old }

View file

@ -23,7 +23,8 @@ type Context struct {
// NewContext allocates a new context with a given sink and host. Note that the host is "owned" by this context from
// here forwards, such that when the context's resources are reclaimed, so too are the host's.
func NewContext(d diag.Sink, host Host, cfg ConfigSource, pwd string, parentSpan opentracing.Span) (*Context, error) {
func NewContext(d diag.Sink, host Host, cfg ConfigSource, events Events,
pwd string, parentSpan opentracing.Span) (*Context, error) {
ctx := &Context{
Diag: d,
Host: host,
@ -31,7 +32,7 @@ func NewContext(d diag.Sink, host Host, cfg ConfigSource, pwd string, parentSpan
tracingSpan: parentSpan,
}
if host == nil {
h, err := NewDefaultHost(ctx, cfg)
h, err := NewDefaultHost(ctx, cfg, events)
if err != nil {
return nil, err
}

View file

@ -47,11 +47,20 @@ type Host interface {
Close() error
}
// Events provides higher-level consumers of the plugin model to attach callbacks on
// plugin load events.
type Events interface {
// OnPluginLoad is fired by the plugin host whenever a new plugin is successfully loaded.
// newPlugin is the plugin that was loaded.
OnPluginLoad(newPlugin workspace.PluginInfo) error
}
// NewDefaultHost implements the standard plugin logic, using the standard installation root to find them.
func NewDefaultHost(ctx *Context, config ConfigSource) (Host, error) {
func NewDefaultHost(ctx *Context, config ConfigSource, events Events) (Host, error) {
host := &defaultHost{
ctx: ctx,
config: config,
events: events,
analyzerPlugins: make(map[tokens.QName]*analyzerPlugin),
languagePlugins: make(map[string]*languagePlugin),
resourcePlugins: make(map[tokens.Package]*resourcePlugin),
@ -84,6 +93,7 @@ type pluginLoadRequest struct {
type defaultHost struct {
ctx *Context // the shared context for this host.
config ConfigSource // the source for provider configuration parameters.
events Events // optional callbacks for plugin load events
analyzerPlugins map[tokens.QName]*analyzerPlugin // a cache of analyzer plugins and their processes.
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
resourcePlugins map[tokens.Package]*resourcePlugin // a cache of resource plugins and their processes.
@ -150,6 +160,11 @@ func (host *defaultHost) Analyzer(name tokens.QName) (Analyzer, error) {
// Memoize the result.
host.plugins = append(host.plugins, info)
host.analyzerPlugins[name] = &analyzerPlugin{Plugin: plug, Info: info}
if host.events != nil {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err
@ -221,6 +236,11 @@ func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (
// Memoize the result.
host.plugins = append(host.plugins, info)
host.resourcePlugins[pkg] = &resourcePlugin{Plugin: plug, Info: info}
if host.events != nil {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err
@ -250,6 +270,11 @@ func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error
// Memoize the result.
host.plugins = append(host.plugins, info)
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
if host.events != nil {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err