package backend import ( "time" "github.com/pkg/errors" "github.com/pulumi/pulumi/pkg/engine" "github.com/pulumi/pulumi/pkg/resource" "github.com/pulumi/pulumi/pkg/resource/deploy" "github.com/pulumi/pulumi/pkg/util/contract" "github.com/pulumi/pulumi/pkg/util/logging" "github.com/pulumi/pulumi/pkg/version" "github.com/pulumi/pulumi/pkg/workspace" ) // SnapshotPersister is an interface implemented by our backends that implements snapshot // persistence. In order to fit into our current model, snapshot persisters have two functions: // saving snapshots and invalidating already-persisted snapshots. type SnapshotPersister interface { // Invalidates the last snapshot that was persisted. This is done as the first step // of performing a mutation on the snapshot. Returns an error if the invalidation failed. Invalidate() error // Persists the given snapshot. Returns an error if the persistence failed. Save(snapshot *deploy.Snapshot) error } // SnapshotManager is an implementation of engine.SnapshotManager that inspects steps and performs // mutations on the global snapshot object serially. This implementation maintains two bits of state: the "base" // snapshot, which is completely immutable and represents the state of the world prior to the application // of the current plan, and a "new" list of resources, which consists of the resources that were operated upon // by the current plan. // // Important to note is that, although this SnapshotManager is designed to be easily convertible into a thread-safe // implementation, the code as it is today is *not thread safe*. In particular, it is not legal for there to be // more than one `SnapshotMutation` active at any point in time. This is because this SnapshotManager invalidates // the last persisted snapshot in `BeginSnapshot`. This is designed to match existing behavior and will not // be the state of things going forward. // // The resources stored in the `resources` slice are pointers to resource objects allocated by the engine. // This is subtle and a little confusing. The reason for this is that the engine directly mutates resource objects // that it creates and expects those mutations to be persisted directly to the snapshot. type SnapshotManager struct { persister SnapshotPersister // The persister responsible for invalidating and persisting the snapshot baseSnapshot *deploy.Snapshot // The base snapshot for this plan resources []*resource.State // The list of resources operated upon by this plan dones map[*resource.State]bool // The set of resources that have been operated upon already by this plan doVerify bool // If true, verify the snapshot before persisting it plugins []workspace.PluginInfo // The list of plugins loaded by the plan, to be saved in the manifest mutationRequests chan func() // The queue of mutation requests, to be retired serially by the manager } var _ engine.SnapshotManager = (*SnapshotManager)(nil) func (sm *SnapshotManager) Close() error { close(sm.mutationRequests) return nil } // If you need to understand what's going on in this file, start here! // // mutate is the serialization point for reads and writes of the global snapshot state. // The given function will be, at the time of its invocation, the only function allowed to // mutate state within the SnapshotManager. // // Serialization is performed by pushing the mutation function onto a channel, where another // goroutine is polling the channel and executing the mutation functions as they come. // This function optionally verifies the integrity of the snapshot before and after mutation. // Immediately after the mutating function is run, the snapshot's manifest is updated and, // if there are no verification errors, the snapshot is persisted. // // You should never observe or mutate the global snapshot without using this function unless // you have a very good justification. func (sm *SnapshotManager) mutate(mutator func()) error { responseChan := make(chan error) sm.mutationRequests <- func() { mutator() snap := sm.snap() err := sm.persister.Save(snap) if err == nil && sm.doVerify { if err = snap.VerifyIntegrity(); err != nil { err = errors.Wrapf(err, "after mutation of snapshot") } } responseChan <- err } return <-responseChan } // RegisterResourceOutputs handles the registering of outputs on a Step that has already // completed. This is accomplished by doing an in-place mutation of the resources currently // resident in the snapshot. // // Due to the way this is currently implemented, the engine directly mutates output properties // on the resource State object that it created. Since we are storing pointers to these objects // in the `resources` slice, we need only to do a no-op mutation in order to flush these new // mutations to disk. // // Note that this is completely not thread-safe and defeats the purpose of having a `mutate` callback // entirely, but the hope is that this state of things will not be permament. func (sm *SnapshotManager) RegisterResourceOutputs(step deploy.Step) error { return sm.refresh() } // RecordPlugin records that the current plan loaded a plugin and saves it in the snapshot. func (sm *SnapshotManager) RecordPlugin(plugin workspace.PluginInfo) error { logging.V(9).Infof("SnapshotManager: RecordPlugin(%v)", plugin) return sm.mutate(func() { sm.plugins = append(sm.plugins, plugin) }) } // BeginMutation signals to the SnapshotManager that the engine intends to mutate the global snapshot // by performing the given Step. This function gives the SnapshotManager a chance to record the // intent to mutate before the mutation occurs. func (sm *SnapshotManager) BeginMutation(step deploy.Step) (engine.SnapshotMutation, error) { contract.Require(step != nil, "step != nil") logging.V(9).Infof("SnapshotManager: Beginning mutation for step `%s` on resource `%s`", step.Op(), step.URN()) // This is for compat with the existing update model with the service. Invalidating a // stack sets a bit in a database indicating that the stored snapshot is not valid. if err := sm.persister.Invalidate(); err != nil { logging.V(9).Infof("SnapshotManager: Failed to invalidate snapshot: %s", err.Error()) return nil, err } switch step.Op() { case deploy.OpSame: return &sameSnapshotMutation{sm}, nil case deploy.OpCreate, deploy.OpCreateReplacement: return &createSnapshotMutation{sm}, nil case deploy.OpUpdate: return &updateSnapshotMutation{sm}, nil case deploy.OpDelete, deploy.OpDeleteReplaced: return &deleteSnapshotMutation{sm}, nil case deploy.OpReplace: return &replaceSnapshotMutation{}, nil } contract.Failf("unknown StepOp: %s", step.Op()) return nil, nil } // All SnapshotMutation implementations in this file follow the same basic formula: // mark the "old" state as done and mark the "new" state as new. The two special // cases are Create (where the "old" state does not exist) and Delete (where the "new" state // does not exist). // // Marking a resource state as old prevents it from being persisted to the snapshot in // the `snap` function. Marking a resource state as new /enables/ it to be persisted to // the snapshot in `snap`. See the comments in `snap` for more details. type sameSnapshotMutation struct { manager *SnapshotManager } func (ssm *sameSnapshotMutation) End(step deploy.Step, successful bool) error { contract.Require(step != nil, "step != nil") logging.V(9).Infof("SnapshotManager: sameSnapshotMutation.End(..., %v)", successful) return ssm.manager.mutate(func() { if successful { ssm.manager.markDone(step.Old()) ssm.manager.markNew(step.New()) } }) } type createSnapshotMutation struct { manager *SnapshotManager } func (csm *createSnapshotMutation) End(step deploy.Step, successful bool) error { contract.Require(step != nil, "step != nil") logging.V(9).Infof("SnapshotManager: createSnapshotMutation.End(..., %v)", successful) return csm.manager.mutate(func() { if successful { // There is some very subtle behind-the-scenes magic here that // comes into play whenever this create is a CreateReplacement. // // Despite intending for the base snapshot to be immutable, the engine // does in fact mutate it by setting a `Delete` flag on resources // being replaced as part of a Create-Before-Delete replacement sequence. // Since we are storing the base snapshot and all resources by reference // (we have pointers to engine-allocated objects), this transparently // "just works" for the SnapshotManager. csm.manager.markNew(step.New()) } }) } type updateSnapshotMutation struct { manager *SnapshotManager } func (usm *updateSnapshotMutation) End(step deploy.Step, successful bool) error { contract.Require(step != nil, "step != nil") logging.V(9).Infof("SnapshotManager: updateSnapshotMutation.End(..., %v)", successful) return usm.manager.mutate(func() { if successful { usm.manager.markDone(step.Old()) usm.manager.markNew(step.New()) } }) } type deleteSnapshotMutation struct { manager *SnapshotManager } func (dsm *deleteSnapshotMutation) End(step deploy.Step, successful bool) error { contract.Require(step != nil, "step != nil") logging.V(9).Infof("SnapshotManager: deleteSnapshotMutation.End(..., %v)", successful) return dsm.manager.mutate(func() { if successful { contract.Assert(!step.Old().Protect) dsm.manager.markDone(step.Old()) } }) } type replaceSnapshotMutation struct{} func (rsm *replaceSnapshotMutation) End(step deploy.Step, successful bool) error { return nil } // refresh does a no-op mutation that forces the SnapshotManager to persist the // snapshot exactly as it is currently to disk. This is useful when a mutation // has failed and we do not intend to persist the failed mutation. func (sm *SnapshotManager) refresh() error { logging.V(9).Infof("SnapshotManager: refresh()") return sm.mutate(func() {}) } // markDone marks a resource as having been processed. Resources that have been marked // in this manner won't be persisted in the snapshot. func (sm *SnapshotManager) markDone(state *resource.State) { contract.Assert(state != nil) sm.dones[state] = true logging.V(9).Infof("Marked old state snapshot as done: %v", state.URN) } // markNew marks a resource as existing in the new snapshot. This occurs on // successful non-deletion operations where the given state is the new state // of a resource that will be persisted to the snapshot. func (sm *SnapshotManager) markNew(state *resource.State) { contract.Assert(state != nil) sm.resources = append(sm.resources, state) logging.V(9).Infof("Appended new state snapshot to be written: %v", state.URN) } // snap produces a new Snapshot given the base snapshot and a list of resources that the current // plan has created. func (sm *SnapshotManager) snap() *deploy.Snapshot { // At this point we have two resource DAGs. One of these is the base DAG for this plan; the other is the current DAG // for this plan. Any resource r may be present in both DAGs. In order to produce a snapshot, we need to merge these // DAGs such that all resource dependencies are correctly preserved. Conceptually, the merge proceeds as follows: // // - Begin with an empty merged DAG. // - For each resource r in the current DAG, insert r and its outgoing edges into the merged DAG. // - For each resource r in the base DAG: // - If r is in the merged DAG, we are done: if the resource is in the merged DAG, it must have been in the // current DAG, which accurately captures its current dependencies. // - If r is not in the merged DAG, insert it and its outgoing edges into the merged DAG. // // Physically, however, each DAG is represented as list of resources without explicit dependency edges. In place of // edges, it is assumed that the list represents a valid topological sort of its source DAG. Thus, any resource r at // index i in a list L must be assumed to be dependent on all resources in L with index j s.t. j < i. Due to this // representation, we implement the algorithm above as follows to produce a merged list that represents a valid // topological sort of the merged DAG: // // - Begin with an empty merged list. // - For each resource r in the current list, append r to the merged list. r must be in a correct location in the // merged list, as its position relative to its assumed dependencies has not changed. // - For each resource r in the base list: // - If r is in the merged list, we are done by the logic given in the original algorithm. // - If r is not in the merged list, append r to the merged list. r must be in a correct location in the merged // list: // - If any of r's dependencies were in the current list, they must already be in the merged list and their // relative order w.r.t. r has not changed. // - If any of r's dependencies were not in the current list, they must already be in the merged list, as // they would have been appended to the list before r. // Start with a copy of the resources produced during the evaluation of the current plan. resources := make([]*resource.State, len(sm.resources)) copy(resources, sm.resources) // Append any resources from the base plan that were not produced by the current plan. if base := sm.baseSnapshot; base != nil { for _, res := range base.Resources { if !sm.dones[res] { resources = append(resources, res) } } } manifest := deploy.Manifest{ Time: time.Now(), Version: version.Version, Plugins: sm.plugins, } manifest.Magic = manifest.NewMagic() return deploy.NewSnapshot(manifest, resources) } // NewSnapshotManager creates a new SnapshotManager for the given stack name, using the given persister // and base snapshot. // // It is *very important* that the baseSnap pointer refers to the same Snapshot // given to the engine! The engine will mutate this object and correctness of the // SnapshotManager depends on being able to observe this mutation. (This is not ideal...) func NewSnapshotManager(persister SnapshotPersister, baseSnap *deploy.Snapshot) *SnapshotManager { manager := &SnapshotManager{ persister: persister, baseSnapshot: baseSnap, dones: make(map[*resource.State]bool), doVerify: true, mutationRequests: make(chan func()), } go func() { for request := range manager.mutationRequests { request() } }() return manager }