pulumi/pkg/engine/journal.go
Ian Wahbe 272c4643b2
Update error handling (#8406)
This is the result of a change applied via `go-rewrap-errors`.
2021-11-12 18:37:17 -08:00

204 lines
5.6 KiB
Go

package engine
import (
"errors"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)
var _ = SnapshotManager((*Journal)(nil))
type JournalEntryKind int
const (
JournalEntryBegin JournalEntryKind = 0
JournalEntrySuccess JournalEntryKind = 1
JournalEntryFailure JournalEntryKind = 2
JournalEntryOutputs JournalEntryKind = 4
)
type JournalEntry struct {
Kind JournalEntryKind
Step deploy.Step
}
type JournalEntries []JournalEntry
func (entries JournalEntries) Snap(base *deploy.Snapshot) *deploy.Snapshot {
// Build up a list of current resources by replaying the journal.
resources, dones := []*resource.State{}, make(map[*resource.State]bool)
ops, doneOps := []resource.Operation{}, make(map[*resource.State]bool)
for _, e := range entries {
logging.V(7).Infof("%v %v (%v)", e.Step.Op(), e.Step.URN(), e.Kind)
// Begin journal entries add pending operations to the snapshot. As we see success or failure
// entries, we'll record them in doneOps.
switch e.Kind {
case JournalEntryBegin:
switch e.Step.Op() {
case deploy.OpCreate, deploy.OpCreateReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeCreating))
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
ops = append(ops, resource.NewOperation(e.Step.Old(), resource.OperationTypeDeleting))
case deploy.OpRead, deploy.OpReadReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeReading))
case deploy.OpUpdate:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeUpdating))
case deploy.OpImport, deploy.OpImportReplacement:
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeImporting))
}
case JournalEntryFailure, JournalEntrySuccess:
switch e.Step.Op() {
// nolint: lll
case deploy.OpCreate, deploy.OpCreateReplacement, deploy.OpRead, deploy.OpReadReplacement, deploy.OpUpdate,
deploy.OpImport, deploy.OpImportReplacement:
doneOps[e.Step.New()] = true
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
doneOps[e.Step.Old()] = true
}
}
// Now mark resources done as necessary.
if e.Kind == JournalEntrySuccess {
switch e.Step.Op() {
case deploy.OpSame, deploy.OpUpdate:
resources = append(resources, e.Step.New())
dones[e.Step.Old()] = true
case deploy.OpCreate, deploy.OpCreateReplacement:
resources = append(resources, e.Step.New())
if old := e.Step.Old(); old != nil && old.PendingReplacement {
dones[old] = true
}
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
if old := e.Step.Old(); !old.PendingReplacement {
dones[old] = true
}
case deploy.OpReplace:
// do nothing.
case deploy.OpRead, deploy.OpReadReplacement:
resources = append(resources, e.Step.New())
if e.Step.Old() != nil {
dones[e.Step.Old()] = true
}
case deploy.OpRemovePendingReplace:
dones[e.Step.Old()] = true
case deploy.OpImport, deploy.OpImportReplacement:
resources = append(resources, e.Step.New())
dones[e.Step.New()] = true
}
}
}
// Append any resources from the base snapshot that were not produced by the current snapshot.
// See backend.SnapshotManager.snap for why this works.
if base != nil {
for _, res := range base.Resources {
if !dones[res] {
resources = append(resources, res)
}
}
}
// Append any pending operations.
var operations []resource.Operation
for _, op := range ops {
if !doneOps[op.Resource] {
operations = append(operations, op)
}
}
// If we have a base snapshot, copy over its secrets manager.
var secretsManager secrets.Manager
if base != nil {
secretsManager = base.SecretsManager
}
manifest := deploy.Manifest{}
manifest.Magic = manifest.NewMagic()
return deploy.NewSnapshot(manifest, secretsManager, resources, operations)
}
type Journal struct {
entries JournalEntries
events chan JournalEntry
cancel chan bool
done chan bool
}
func (j *Journal) Entries() []JournalEntry {
<-j.done
return j.entries
}
func (j *Journal) Close() error {
close(j.cancel)
<-j.done
return nil
}
func (j *Journal) BeginMutation(step deploy.Step) (SnapshotMutation, error) {
select {
case j.events <- JournalEntry{Kind: JournalEntryBegin, Step: step}:
return j, nil
case <-j.cancel:
return nil, errors.New("journal closed")
}
}
func (j *Journal) End(step deploy.Step, success bool) error {
kind := JournalEntryFailure
if success {
kind = JournalEntrySuccess
}
select {
case j.events <- JournalEntry{Kind: kind, Step: step}:
return nil
case <-j.cancel:
return errors.New("journal closed")
}
}
func (j *Journal) RegisterResourceOutputs(step deploy.Step) error {
select {
case j.events <- JournalEntry{Kind: JournalEntryOutputs, Step: step}:
return nil
case <-j.cancel:
return errors.New("journal closed")
}
}
func (j *Journal) RecordPlugin(plugin workspace.PluginInfo) error {
return nil
}
func (j *Journal) Snap(base *deploy.Snapshot) *deploy.Snapshot {
return j.entries.Snap(base)
}
func NewJournal() *Journal {
j := &Journal{
events: make(chan JournalEntry),
cancel: make(chan bool),
done: make(chan bool),
}
go func() {
for {
select {
case <-j.cancel:
close(j.done)
return
case e := <-j.events:
j.entries = append(j.entries, e)
}
}
}()
return j
}