pulumi/pkg/resource/deploy/source_query.go
Matt Ellis f705dde7fb Remove acceptsSecrets from InvokeRequest
In our system, we model secrets as outputs with an additional bit of
metadata that says they are secret. For Read and Register resource
calls, our RPC interface says if the client side of the interface can
handle secrets being returned (i.e. the language SDK knows how to
sniff for the special signiture and resolve the output with the
special bit set).

For Invoke, we have no such model. Instead, we return a `Promise<T>`
where T's shape has just regular property fields.  There's no place
for us to tack the secretness onto, since there are no Outputs.

So, for now, don't even return secret values back across the invoke
channel. We can still take them as arguments (which is good) but we
can't even return secrets as part of invoke calls. This is not ideal,
but given the way we model these sources, there's no way around
this.  Fortunately, the result of these invoke calls are not stored in
the checkpoint and since the type is not Output<T> it will be clear
that the underlying value is just present in plaintext. A user that
wants to pass the result of an invoke into a resource can turn an
existing property into a secret via `pulumi.secret`.
2019-05-10 17:07:52 -07:00

301 lines
10 KiB
Go

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deploy
import (
"context"
"fmt"
"math"
pbempty "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/pulumi/pulumi/pkg/resource/plugin"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/result"
"github.com/pulumi/pulumi/pkg/util/rpcutil"
pulumirpc "github.com/pulumi/pulumi/sdk/proto/go"
)
// QuerySource evaluates a query program, and provides the ability to synchronously wait for
// completion.
type QuerySource interface {
Wait() result.Result
}
// NewQuerySource creates a `QuerySource` for some target runtime environment specified by
// `runinfo`, and supported by language plugins provided in `plugctx`.
func NewQuerySource(cancel context.Context, plugctx *plugin.Context, client BackendClient,
runinfo *EvalRunInfo) (QuerySource, error) {
// Create a new builtin provider. This provider implements features such as `getStack`.
builtins := newBuiltinProvider(client)
// First, fire up a resource monitor that will disallow all resource operations, as well as
// service calls for things like resource ouptuts of state snapshots.
//
// NOTE: Using the queryResourceMonitor here is *VERY* important, as its job is to disallow
// resource operations in query mode!
mon, err := newQueryResourceMonitor(builtins)
if err != nil {
return nil, errors.Wrap(err, "failed to start resource monitor")
}
// Create a new iterator with appropriate channels, and gear up to go!
src := &querySource{
mon: mon,
plugctx: plugctx,
runinfo: runinfo,
runLangPlugin: runLangPlugin,
finChan: make(chan result.Result),
cancel: cancel,
}
// Now invoke Run in a goroutine. All subsequent resource creation events will come in over the gRPC channel,
// and we will pump them through the channel. If the Run call ultimately fails, we need to propagate the error.
src.forkRun()
// Finally, return the fresh iterator that the caller can use to take things from here.
return src, nil
}
type querySource struct {
mon SourceResourceMonitor // the resource monitor, per iterator.
plugctx *plugin.Context // the plugin context.
runinfo *EvalRunInfo // the directives to use when running the program.
runLangPlugin func(*querySource) result.Result // runs the language plugin.
finChan chan result.Result // the channel that communicates completion.
done bool // set to true when the evaluation is done.
res result.Result // result when the channel is finished.
cancel context.Context
}
func (src *querySource) Close() error {
// Cancel the monitor and reclaim any associated resources.
src.done = true
close(src.finChan)
return src.mon.Cancel()
}
func (src *querySource) Wait() result.Result {
// If we are done, quit.
if src.done {
return src.res
}
select {
case src.res = <-src.finChan:
// Language plugin has exited. No need to call `Close`.
src.done = true
return src.res
case <-src.cancel.Done():
src.done = true
src.Close()
return src.res
}
}
// forkRun evaluate the query program in a separate goroutine. Completion or cancellation will cause
// `Wait` to stop blocking and return.
func (src *querySource) forkRun() {
// Fire up the goroutine to make the RPC invocation against the language runtime. As this executes, calls
// to queue things up in the resource channel will occur, and we will serve them concurrently.
go func() {
// Next, launch the language plugin. Communicate the error, if it exists, or nil if the
// program exited cleanly.
src.finChan <- src.runLangPlugin(src)
}()
}
func runLangPlugin(src *querySource) result.Result {
rt := src.runinfo.Proj.Runtime.Name()
langhost, err := src.plugctx.Host.LanguageRuntime(rt)
if err != nil {
return result.FromError(errors.Wrapf(err, "failed to launch language host %s", rt))
}
contract.Assertf(langhost != nil, "expected non-nil language host %s", rt)
// Make sure to clean up before exiting.
defer contract.IgnoreClose(langhost)
// Decrypt the configuration.
config, err := src.runinfo.Target.Config.Decrypt(src.runinfo.Target.Decrypter)
if err != nil {
return result.FromError(err)
}
// Now run the actual program.
progerr, bail, err := langhost.Run(plugin.RunInfo{
MonitorAddress: src.mon.Address(),
Stack: string(src.runinfo.Target.Name),
Project: string(src.runinfo.Proj.Name),
Pwd: src.runinfo.Pwd,
Program: src.runinfo.Program,
Args: src.runinfo.Args,
Config: config,
DryRun: true,
QueryMode: true,
Parallel: math.MaxInt32,
})
// Check if we were asked to Bail. This a special random constant used for that
// purpose.
if err == nil && bail {
return result.Bail()
}
if err == nil && progerr != "" {
// If the program had an unhandled error; propagate it to the caller.
err = errors.Errorf("an unhandled error occurred: %v", progerr)
}
return result.WrapIfNonNil(err)
}
// newQueryResourceMonitor creates a new resource monitor RPC server intended to be used in Pulumi's
// "query mode".
func newQueryResourceMonitor(builtins *builtinProvider) (*queryResmon, error) {
// Create our cancellation channel.
cancel := make(chan bool)
// New up an engine RPC server.
queryResmon := &queryResmon{
builtins: builtins,
cancel: cancel,
}
// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, queryResmon)
return nil
},
})
if err != nil {
return nil, err
}
queryResmon.addr = fmt.Sprintf("127.0.0.1:%d", port)
queryResmon.done = done
return queryResmon, nil
}
// queryResmon is a pulumirpc.ResourceMonitor that is meant to run in Pulumi's "query mode". It
// performs two critical functions:
//
// 1. Disallows all resource operations. `queryResmon` intercepts all resource operations and
// returns an error instead of allowing them to proceed.
// 2. Services requests for stack snapshots. This is primarily to allow us to allow queries across
// stack snapshots.
type queryResmon struct {
builtins *builtinProvider // provides builtins such as `getStack`.
addr string // the address the host is listening on.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
}
var _ SourceResourceMonitor = (*queryResmon)(nil)
// Address returns the address at which the monitor's RPC server may be reached.
func (rm *queryResmon) Address() string {
return rm.addr
}
// Cancel signals that the engine should be terminated, awaits its termination, and returns any
// errors that result.
func (rm *queryResmon) Cancel() error {
close(rm.cancel)
return <-rm.done
}
// Invoke performs an invocation of a member located in a resource provider.
func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("QueryResourceMonitor.Invoke(%s)", tok)
// Fail on all calls to `Invoke` except this one.
if tok != readStackResourceOutputs {
return nil, fmt.Errorf("Query mode does not support invoke call for operation '%s'", tok)
}
args, err := plugin.UnmarshalProperties(
req.GetArgs(), plugin.MarshalOptions{Label: label, KeepUnknowns: true})
if err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal %v args", tok)
}
// Dispatch request for resource outputs to builtin provider.
ret, failures, err := rm.builtins.Invoke(tok, args)
if err != nil {
return nil, errors.Wrapf(err, "invoke %s failed", tok)
}
mret, err := plugin.MarshalProperties(ret, plugin.MarshalOptions{Label: label, KeepUnknowns: true})
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal return")
}
var chkfails []*pulumirpc.CheckFailure
for _, failure := range failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Property: string(failure.Property),
Reason: failure.Reason,
})
}
return &pulumirpc.InvokeResponse{Return: mret, Failures: chkfails}, nil
}
// ReadResource reads the current state associated with a resource from its provider plugin.
func (rm *queryResmon) ReadResource(ctx context.Context,
req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error) {
return nil, fmt.Errorf("Query mode does not support reading resources")
}
// RegisterResource is invoked by a language process when a new resource has been allocated.
func (rm *queryResmon) RegisterResource(ctx context.Context,
req *pulumirpc.RegisterResourceRequest) (*pulumirpc.RegisterResourceResponse, error) {
return nil, fmt.Errorf("Query mode does not support creating, updating, or deleting resources")
}
// RegisterResourceOutputs records some new output properties for a resource that have arrived after its initial
// provisioning. These will make their way into the eventual checkpoint state file for that resource.
func (rm *queryResmon) RegisterResourceOutputs(ctx context.Context,
req *pulumirpc.RegisterResourceOutputsRequest) (*pbempty.Empty, error) {
return nil, fmt.Errorf("Query mode does not support registering resource operations")
}
// SupportsFeature the query resmon is able to have secrets passed to it, which may be arguments to invoke calls.
func (rm *queryResmon) SupportsFeature(ctx context.Context,
req *pulumirpc.SupportsFeatureRequest) (*pulumirpc.SupportsFeatureResponse, error) {
hasSupport := false
switch req.Id {
case "secrets":
hasSupport = true
}
return &pulumirpc.SupportsFeatureResponse{
HasSupport: hasSupport,
}, nil
}