// Copyright 2016-2018, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package deploy import ( "context" "fmt" "math" pbempty "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "google.golang.org/grpc" "github.com/pulumi/pulumi/pkg/resource/plugin" "github.com/pulumi/pulumi/pkg/tokens" "github.com/pulumi/pulumi/pkg/util/contract" "github.com/pulumi/pulumi/pkg/util/result" "github.com/pulumi/pulumi/pkg/util/rpcutil" pulumirpc "github.com/pulumi/pulumi/sdk/proto/go" ) // QuerySource evaluates a query program, and provides the ability to synchronously wait for // completion. type QuerySource interface { Wait() result.Result } // NewQuerySource creates a `QuerySource` for some target runtime environment specified by // `runinfo`, and supported by language plugins provided in `plugctx`. func NewQuerySource(cancel context.Context, plugctx *plugin.Context, client BackendClient, runinfo *EvalRunInfo) (QuerySource, error) { // Create a new builtin provider. This provider implements features such as `getStack`. builtins := newBuiltinProvider(client) // First, fire up a resource monitor that will disallow all resource operations, as well as // service calls for things like resource ouptuts of state snapshots. // // NOTE: Using the queryResourceMonitor here is *VERY* important, as its job is to disallow // resource operations in query mode! mon, err := newQueryResourceMonitor(builtins) if err != nil { return nil, errors.Wrap(err, "failed to start resource monitor") } // Create a new iterator with appropriate channels, and gear up to go! src := &querySource{ mon: mon, plugctx: plugctx, runinfo: runinfo, runLangPlugin: runLangPlugin, finChan: make(chan result.Result), cancel: cancel, } // Now invoke Run in a goroutine. All subsequent resource creation events will come in over the gRPC channel, // and we will pump them through the channel. If the Run call ultimately fails, we need to propagate the error. src.forkRun() // Finally, return the fresh iterator that the caller can use to take things from here. return src, nil } type querySource struct { mon SourceResourceMonitor // the resource monitor, per iterator. plugctx *plugin.Context // the plugin context. runinfo *EvalRunInfo // the directives to use when running the program. runLangPlugin func(*querySource) result.Result // runs the language plugin. finChan chan result.Result // the channel that communicates completion. done bool // set to true when the evaluation is done. res result.Result // result when the channel is finished. cancel context.Context } func (src *querySource) Close() error { // Cancel the monitor and reclaim any associated resources. src.done = true close(src.finChan) return src.mon.Cancel() } func (src *querySource) Wait() result.Result { // If we are done, quit. if src.done { return src.res } select { case src.res = <-src.finChan: // Language plugin has exited. No need to call `Close`. src.done = true return src.res case <-src.cancel.Done(): src.done = true src.Close() return src.res } } // forkRun evaluate the query program in a separate goroutine. Completion or cancellation will cause // `Wait` to stop blocking and return. func (src *querySource) forkRun() { // Fire up the goroutine to make the RPC invocation against the language runtime. As this executes, calls // to queue things up in the resource channel will occur, and we will serve them concurrently. go func() { // Next, launch the language plugin. Communicate the error, if it exists, or nil if the // program exited cleanly. src.finChan <- src.runLangPlugin(src) }() } func runLangPlugin(src *querySource) result.Result { rt := src.runinfo.Proj.Runtime.Name() langhost, err := src.plugctx.Host.LanguageRuntime(rt) if err != nil { return result.FromError(errors.Wrapf(err, "failed to launch language host %s", rt)) } contract.Assertf(langhost != nil, "expected non-nil language host %s", rt) // Make sure to clean up before exiting. defer contract.IgnoreClose(langhost) // Decrypt the configuration. config, err := src.runinfo.Target.Config.Decrypt(src.runinfo.Target.Decrypter) if err != nil { return result.FromError(err) } // Now run the actual program. progerr, bail, err := langhost.Run(plugin.RunInfo{ MonitorAddress: src.mon.Address(), Stack: string(src.runinfo.Target.Name), Project: string(src.runinfo.Proj.Name), Pwd: src.runinfo.Pwd, Program: src.runinfo.Program, Args: src.runinfo.Args, Config: config, DryRun: true, QueryMode: true, Parallel: math.MaxInt32, }) // Check if we were asked to Bail. This a special random constant used for that // purpose. if err == nil && bail { return result.Bail() } if err == nil && progerr != "" { // If the program had an unhandled error; propagate it to the caller. err = errors.Errorf("an unhandled error occurred: %v", progerr) } return result.WrapIfNonNil(err) } // newQueryResourceMonitor creates a new resource monitor RPC server intended to be used in Pulumi's // "query mode". func newQueryResourceMonitor(builtins *builtinProvider) (*queryResmon, error) { // Create our cancellation channel. cancel := make(chan bool) // New up an engine RPC server. queryResmon := &queryResmon{ builtins: builtins, cancel: cancel, } // Fire up a gRPC server and start listening for incomings. port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{ func(srv *grpc.Server) error { pulumirpc.RegisterResourceMonitorServer(srv, queryResmon) return nil }, }) if err != nil { return nil, err } queryResmon.addr = fmt.Sprintf("127.0.0.1:%d", port) queryResmon.done = done return queryResmon, nil } // queryResmon is a pulumirpc.ResourceMonitor that is meant to run in Pulumi's "query mode". It // performs two critical functions: // // 1. Disallows all resource operations. `queryResmon` intercepts all resource operations and // returns an error instead of allowing them to proceed. // 2. Services requests for stack snapshots. This is primarily to allow us to allow queries across // stack snapshots. type queryResmon struct { builtins *builtinProvider // provides builtins such as `getStack`. addr string // the address the host is listening on. cancel chan bool // a channel that can cancel the server. done chan error // a channel that resolves when the server completes. } var _ SourceResourceMonitor = (*queryResmon)(nil) // Address returns the address at which the monitor's RPC server may be reached. func (rm *queryResmon) Address() string { return rm.addr } // Cancel signals that the engine should be terminated, awaits its termination, and returns any // errors that result. func (rm *queryResmon) Cancel() error { close(rm.cancel) return <-rm.done } // Invoke performs an invocation of a member located in a resource provider. func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) { tok := tokens.ModuleMember(req.GetTok()) label := fmt.Sprintf("QueryResourceMonitor.Invoke(%s)", tok) // Fail on all calls to `Invoke` except this one. if tok != readStackResourceOutputs { return nil, fmt.Errorf("Query mode does not support invoke call for operation '%s'", tok) } args, err := plugin.UnmarshalProperties( req.GetArgs(), plugin.MarshalOptions{Label: label, KeepUnknowns: true}) if err != nil { return nil, errors.Wrapf(err, "failed to unmarshal %v args", tok) } // Dispatch request for resource outputs to builtin provider. ret, failures, err := rm.builtins.Invoke(tok, args) if err != nil { return nil, errors.Wrapf(err, "invoke %s failed", tok) } mret, err := plugin.MarshalProperties(ret, plugin.MarshalOptions{Label: label, KeepUnknowns: true}) if err != nil { return nil, errors.Wrapf(err, "failed to marshal return") } var chkfails []*pulumirpc.CheckFailure for _, failure := range failures { chkfails = append(chkfails, &pulumirpc.CheckFailure{ Property: string(failure.Property), Reason: failure.Reason, }) } return &pulumirpc.InvokeResponse{Return: mret, Failures: chkfails}, nil } // ReadResource reads the current state associated with a resource from its provider plugin. func (rm *queryResmon) ReadResource(ctx context.Context, req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error) { return nil, fmt.Errorf("Query mode does not support reading resources") } // RegisterResource is invoked by a language process when a new resource has been allocated. func (rm *queryResmon) RegisterResource(ctx context.Context, req *pulumirpc.RegisterResourceRequest) (*pulumirpc.RegisterResourceResponse, error) { return nil, fmt.Errorf("Query mode does not support creating, updating, or deleting resources") } // RegisterResourceOutputs records some new output properties for a resource that have arrived after its initial // provisioning. These will make their way into the eventual checkpoint state file for that resource. func (rm *queryResmon) RegisterResourceOutputs(ctx context.Context, req *pulumirpc.RegisterResourceOutputsRequest) (*pbempty.Empty, error) { return nil, fmt.Errorf("Query mode does not support registering resource operations") } // SupportsFeature the query resmon is able to have secrets passed to it, which may be arguments to invoke calls. func (rm *queryResmon) SupportsFeature(ctx context.Context, req *pulumirpc.SupportsFeatureRequest) (*pulumirpc.SupportsFeatureResponse, error) { hasSupport := false switch req.Id { case "secrets": hasSupport = true } return &pulumirpc.SupportsFeatureResponse{ HasSupport: hasSupport, }, nil }