pulumi/pkg/engine/journal.go
Pat Gavlin eaac9280f5
Refactor lifecycle tests. (#5575)
Move these tests to a new package, `lifecycletest`, that also exposes
APIs that allow consumers to implement their own lifecycle tests. This
is intended to ease the burden of testing plugin implementations and to
set the stage for cleaning up the lifecycle tests themselves.

This involves two changes to the public API, only one of which is
strictly necessary:

- The `host` field of `UpdateOptions` is now exported
- The `Journal` type has been moved from test-only code to the package
  proper

The former change is necessary, as it is the mechanism by which package
consumers may inject their own plugin loaders. I was reluctant to expose
this field originally because I wanted to ensure that the behavior of
packages that embed Pulumi is consistent with that of the Pulumi CLI
with respect to plugin loading. I now believe that the risk of consumers
changing this behavior outside of test scenarios is low enough that we
can expose this field. This may also be useful for future scenarios,
e.g. statically linking providers and Pulumi programs.

The latter change is not necessary, but fleshes out the engine package
into a more complete toolkit. Downstream consumers may use the Journal
type to conveniently implement snapshotting.
2020-10-15 10:35:09 -07:00

204 lines
5.6 KiB
Go

package engine
import (
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/v2/resource/deploy"
"github.com/pulumi/pulumi/pkg/v2/secrets"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
)
var _ = SnapshotManager((*Journal)(nil))
type JournalEntryKind int
const (
JournalEntryBegin JournalEntryKind = 0
JournalEntrySuccess JournalEntryKind = 1
JournalEntryFailure JournalEntryKind = 2
JournalEntryOutputs JournalEntryKind = 4
)
type JournalEntry struct {
Kind JournalEntryKind
Step deploy.Step
}
type JournalEntries []JournalEntry
func (entries JournalEntries) Snap(base *deploy.Snapshot) *deploy.Snapshot {
// Build up a list of current resources by replaying the journal.
resources, dones := []*resource.State{}, make(map[*resource.State]bool)
ops, doneOps := []resource.Operation{}, make(map[*resource.State]bool)
for _, e := range entries {
logging.V(7).Infof("%v %v (%v)", e.Step.Op(), e.Step.URN(), e.Kind)
// Begin journal entries add pending operations to the snapshot. As we see success or failure
// entries, we'll record them in doneOps.
switch e.Kind {
case JournalEntryBegin:
switch e.Step.Op() {
case deploy.OpCreate, deploy.OpCreateReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeCreating))
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
ops = append(ops, resource.NewOperation(e.Step.Old(), resource.OperationTypeDeleting))
case deploy.OpRead, deploy.OpReadReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeReading))
case deploy.OpUpdate:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeUpdating))
case deploy.OpImport, deploy.OpImportReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeImporting))
}
case JournalEntryFailure, JournalEntrySuccess:
switch e.Step.Op() {
// nolint: lll
case deploy.OpCreate, deploy.OpCreateReplacement, deploy.OpRead, deploy.OpReadReplacement, deploy.OpUpdate,
deploy.OpImport, deploy.OpImportReplacement:
doneOps[e.Step.New()] = true
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
doneOps[e.Step.Old()] = true
}
}
// Now mark resources done as necessary.
if e.Kind == JournalEntrySuccess {
switch e.Step.Op() {
case deploy.OpSame, deploy.OpUpdate:
resources = append(resources, e.Step.New())
dones[e.Step.Old()] = true
case deploy.OpCreate, deploy.OpCreateReplacement:
resources = append(resources, e.Step.New())
if old := e.Step.Old(); old != nil && old.PendingReplacement {
dones[old] = true
}
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
if old := e.Step.Old(); !old.PendingReplacement {
dones[old] = true
}
case deploy.OpReplace:
// do nothing.
case deploy.OpRead, deploy.OpReadReplacement:
resources = append(resources, e.Step.New())
if e.Step.Old() != nil {
dones[e.Step.Old()] = true
}
case deploy.OpRemovePendingReplace:
dones[e.Step.Old()] = true
case deploy.OpImport, deploy.OpImportReplacement:
resources = append(resources, e.Step.New())
dones[e.Step.New()] = true
}
}
}
// Append any resources from the base snapshot that were not produced by the current snapshot.
// See backend.SnapshotManager.snap for why this works.
if base != nil {
for _, res := range base.Resources {
if !dones[res] {
resources = append(resources, res)
}
}
}
// Append any pending operations.
var operations []resource.Operation
for _, op := range ops {
if !doneOps[op.Resource] {
operations = append(operations, op)
}
}
// If we have a base snapshot, copy over its secrets manager.
var secretsManager secrets.Manager
if base != nil {
secretsManager = base.SecretsManager
}
manifest := deploy.Manifest{}
manifest.Magic = manifest.NewMagic()
return deploy.NewSnapshot(manifest, secretsManager, resources, operations)
}
type Journal struct {
entries JournalEntries
events chan JournalEntry
cancel chan bool
done chan bool
}
func (j *Journal) Entries() []JournalEntry {
<-j.done
return j.entries
}
func (j *Journal) Close() error {
close(j.cancel)
<-j.done
return nil
}
func (j *Journal) BeginMutation(step deploy.Step) (SnapshotMutation, error) {
select {
case j.events <- JournalEntry{Kind: JournalEntryBegin, Step: step}:
return j, nil
case <-j.cancel:
return nil, errors.New("journal closed")
}
}
func (j *Journal) End(step deploy.Step, success bool) error {
kind := JournalEntryFailure
if success {
kind = JournalEntrySuccess
}
select {
case j.events <- JournalEntry{Kind: kind, Step: step}:
return nil
case <-j.cancel:
return errors.New("journal closed")
}
}
func (j *Journal) RegisterResourceOutputs(step deploy.Step) error {
select {
case j.events <- JournalEntry{Kind: JournalEntryOutputs, Step: step}:
return nil
case <-j.cancel:
return errors.New("journal closed")
}
}
func (j *Journal) RecordPlugin(plugin workspace.PluginInfo) error {
return nil
}
func (j *Journal) Snap(base *deploy.Snapshot) *deploy.Snapshot {
return j.entries.Snap(base)
}
func NewJournal() *Journal {
j := &Journal{
events: make(chan JournalEntry),
cancel: make(chan bool),
done: make(chan bool),
}
go func() {
for {
select {
case <-j.cancel:
close(j.done)
return
case e := <-j.events:
j.entries = append(j.entries, e)
}
}
}()
return j
}