gRPC bridge: fix unknowns in Update previews (#6006)

These changes are a combination of three commits, each of which
contributes to the testing and/or fixing of a problem with marshaling
unknowns in `plugin.provider.Update` when `preview` is true.

## deploytest: add support for gRPC adapters. 

These changes add support for communicating with providers using the
gRPC adapters to the deploytest pacakage. This makes it easier to test
the gRPC adapters across typical lifecycle patterns.

Supporting these changes are two additions to the `resource/plugin`
package:

1. A type that bridges between the `plugin.Provider` interface and the
  `pulumirpc.ResourceProviderServer`
2. A function to create a `plugin.Provider` given a
  `pulumirpc.ResourceProviderClient`

The deploytest package uses these to wrap an in-process
`plugin.Provider` in a gRPC interface and connect to it without using
the default plugin host, respectively.

## pulumi_test: test provider preview over gRPC.

Add a test that runs the provider preview lifecycle, but using a
provider that communicates over gRPC.

## gRPC bridge: fix unknowns in `Update` previews

Set the `KeepUnknowns` and `RejectUnknowns` bits in the `MarshalOptions`
used when unmarshaling update results to preserve unknowns during a
preview and reject them otherwise.

These changes also set the `RejectUnknowns` bit in the `MarshalOptions`
used by `Create` if `preview` is false, and fix a bug in the array
unmarshaler that could cause out-of-bounds accesses.

Fixes https://github.com/pulumi/pulumi/issues/6004.
This commit is contained in:
Pat Gavlin 2020-12-23 13:25:48 -08:00 committed by GitHub
parent 782e1bd6e9
commit c6d22a25e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 777 additions and 31 deletions

View file

@ -3,6 +3,10 @@ CHANGELOG
## HEAD (Unreleased)
- Fix a bug in the core engine that could cause previews to fail if a resource with changes had
unknown output property values.
[#6006](https://github.com/pulumi/pulumi/pull/6006)
## 2.16.1 (2020-12-22)
- Fix a panic due to unsafe concurrent map access.

View file

@ -17,7 +17,9 @@ package lifecycletest
import (
"context"
"flag"
"fmt"
"os"
"strings"
"sync"
"testing"
@ -88,6 +90,18 @@ func pickURN(t *testing.T, urns []resource.URN, names []string, target string) r
return ""
}
func TestMain(m *testing.M) {
grpcDefault := flag.Bool("grpc-providers", false, "enable or disable gRPC providers by default")
flag.Parse()
if *grpcDefault {
deploytest.UseGrpcProvidersByDefault = true
}
os.Exit(m.Run())
}
func TestEmptyProgramLifecycle(t *testing.T) {
program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, _ *deploytest.ResourceMonitor) error {
return nil
@ -1634,6 +1648,91 @@ func TestProviderPreview(t *testing.T) {
assert.True(t, sawPreview)
}
func TestProviderPreviewGrpc(t *testing.T) {
sawPreview := false
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{
CreateF: func(urn resource.URN, news resource.PropertyMap, timeout float64,
preview bool) (resource.ID, resource.PropertyMap, resource.Status, error) {
if preview {
sawPreview = true
}
assert.Equal(t, preview, news.ContainsUnknowns())
return "created-id", news, resource.StatusOK, nil
},
UpdateF: func(urn resource.URN, id resource.ID, olds, news resource.PropertyMap, timeout float64,
ignoreChanges []string, preview bool) (resource.PropertyMap, resource.Status, error) {
if preview {
sawPreview = true
}
assert.Equal(t, preview, news.ContainsUnknowns())
return news, resource.StatusOK, nil
},
}, nil
}, deploytest.WithGrpc),
}
preview := true
program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
computed := interface{}(resource.Computed{Element: resource.NewStringProperty("")})
if !preview {
computed = "alpha"
}
ins := resource.NewPropertyMapFromMap(map[string]interface{}{
"foo": "bar",
"baz": map[string]interface{}{
"a": 42,
"b": computed,
},
"qux": []interface{}{
computed,
24,
},
"zed": computed,
})
_, _, state, err := monitor.RegisterResource("pkgA:m:typA", "resA", true, deploytest.ResourceOptions{
Inputs: ins,
})
assert.NoError(t, err)
assert.True(t, state.DeepEquals(ins))
return nil
})
host := deploytest.NewPluginHost(nil, nil, program, loaders...)
p := &TestPlan{
Options: UpdateOptions{Host: host},
}
project := p.GetProject()
// Run a preview. The inputs should be propagated to the outputs by the provider during the create.
preview, sawPreview = true, false
_, res := TestOp(Update).Run(project, p.GetTarget(nil), p.Options, preview, p.BackendClient, nil)
assert.Nil(t, res)
assert.True(t, sawPreview)
// Run an update.
preview, sawPreview = false, false
snap, res := TestOp(Update).Run(project, p.GetTarget(nil), p.Options, preview, p.BackendClient, nil)
assert.Nil(t, res)
assert.False(t, sawPreview)
// Run another preview. The inputs should be propagated to the outputs during the update.
preview, sawPreview = true, false
_, res = TestOp(Update).Run(project, p.GetTarget(snap), p.Options, preview, p.BackendClient, nil)
assert.Nil(t, res)
assert.True(t, sawPreview)
}
func TestSingleComponentDefaultProviderLifecycle(t *testing.T) {
loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {

View file

@ -17,6 +17,7 @@ package deploytest
import (
"context"
"fmt"
"io"
"sync"
"github.com/blang/semver"
@ -28,37 +29,103 @@ import (
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v2/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v2/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v2/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
var UseGrpcProvidersByDefault = false
type LoadProviderFunc func() (plugin.Provider, error)
type LoadProviderWithHostFunc func(host plugin.Host) (plugin.Provider, error)
type ProviderOption func(p *ProviderLoader)
func WithoutGrpc(p *ProviderLoader) {
p.useGRPC = false
}
func WithGrpc(p *ProviderLoader) {
p.useGRPC = true
}
type ProviderLoader struct {
pkg tokens.Package
version semver.Version
load LoadProviderFunc
loadWithHost LoadProviderWithHostFunc
useGRPC bool
}
func NewProviderLoader(pkg tokens.Package, version semver.Version, load LoadProviderFunc) *ProviderLoader {
return &ProviderLoader{
func NewProviderLoader(pkg tokens.Package, version semver.Version, load LoadProviderFunc,
opts ...ProviderOption) *ProviderLoader {
p := &ProviderLoader{
pkg: pkg,
version: version,
load: load,
useGRPC: UseGrpcProvidersByDefault,
}
for _, o := range opts {
o(p)
}
return p
}
func NewProviderLoaderWithHost(pkg tokens.Package, version semver.Version,
load LoadProviderWithHostFunc) *ProviderLoader {
load LoadProviderWithHostFunc, opts ...ProviderOption) *ProviderLoader {
return &ProviderLoader{
p := &ProviderLoader{
pkg: pkg,
version: version,
loadWithHost: load,
useGRPC: UseGrpcProvidersByDefault,
}
for _, o := range opts {
o(p)
}
return p
}
type nopCloserT int
func (nopCloserT) Close() error { return nil }
var nopCloser io.Closer = nopCloserT(0)
type grpcWrapper struct {
stop chan bool
}
func (w *grpcWrapper) Close() error {
go func() { w.stop <- true }()
return nil
}
func wrapProviderWithGrpc(provider plugin.Provider) (plugin.Provider, io.Closer, error) {
wrapper := &grpcWrapper{stop: make(chan bool)}
port, _, err := rpcutil.Serve(0, wrapper.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
pulumirpc.RegisterResourceProviderServer(srv, plugin.NewProviderServer(provider))
return nil
},
}, nil)
if err != nil {
return nil, nil, fmt.Errorf("could not start resource provider service: %w", err)
}
conn, err := grpc.Dial(
fmt.Sprintf("127.0.0.1:%v", port),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
contract.IgnoreClose(wrapper)
return nil, nil, fmt.Errorf("could not connect to resource provider service: %v", err)
}
wrapped := plugin.NewProviderWithClient(nil, provider.Pkg(), pulumirpc.NewResourceProviderClient(conn), false)
return wrapped, wrapper, nil
}
type hostEngine struct {
@ -108,7 +175,7 @@ type pluginHost struct {
engine *hostEngine
providers map[plugin.Provider]struct{}
providers map[plugin.Provider]io.Closer
closed bool
m sync.Mutex
}
@ -138,7 +205,7 @@ func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRu
sink: sink,
statusSink: statusSink,
engine: engine,
providers: make(map[plugin.Provider]struct{}),
providers: map[plugin.Provider]io.Closer{},
}
}
@ -180,10 +247,18 @@ func (host *pluginHost) Provider(pkg tokens.Package, version *semver.Version) (p
return nil, err
}
closer := nopCloser
if best.useGRPC {
prov, closer, err = wrapProviderWithGrpc(prov)
if err != nil {
return nil, err
}
}
host.m.Lock()
defer host.m.Unlock()
host.providers[prov] = struct{}{}
host.providers[prov] = closer
return prov, nil
}
@ -207,9 +282,16 @@ func (host *pluginHost) Close() error {
host.m.Lock()
defer host.m.Unlock()
var err error
for _, closer := range host.providers {
if pErr := closer.Close(); pErr != nil {
err = pErr
}
}
go func() { host.engine.stop <- true }()
host.closed = true
return nil
return err
}
func (host *pluginHost) ServerAddr() string {
return host.engine.address

View file

@ -15,6 +15,7 @@
package plugin
import (
"errors"
"io"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
@ -103,6 +104,9 @@ type CheckFailure struct {
Reason string // the reason the property failed to check.
}
// ErrNotYetImplemented may be returned from a provider for optional methods that are not yet implemented.
var ErrNotYetImplemented = errors.New("NYI")
// DiffChanges represents the kind of changes detected by a diff operation.
type DiffChanges int

View file

@ -15,6 +15,7 @@
package plugin
import (
"context"
"encoding/json"
"fmt"
"io"
@ -106,6 +107,18 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve
}, nil
}
func NewProviderWithClient(ctx *Context, pkg tokens.Package, client pulumirpc.ResourceProviderClient,
disableProviderPreview bool) Provider {
return &provider{
ctx: ctx,
pkg: pkg,
clientRaw: client,
cfgdone: make(chan bool),
disableProviderPreview: disableProviderPreview,
}
}
func (p *provider) Pkg() tokens.Package { return p.pkg }
// label returns a base label for tracing functions.
@ -113,6 +126,13 @@ func (p *provider) label() string {
return fmt.Sprintf("Provider[%s, %p]", p.pkg, p)
}
func (p *provider) requestContext() context.Context {
if p.ctx == nil {
return context.Background()
}
return p.ctx.Request()
}
// isDiffCheckConfigLogicallyUnimplemented returns true when an rpcerror.Error should be treated as if it was an error
// due to a rpc being unimplemented. Due to past mistakes, different providers returned "Unimplemented" in a variaity of
// different ways that don't always result in an Uimplemented error code.
@ -140,7 +160,7 @@ func isDiffCheckConfigLogicallyUnimplemented(err *rpcerror.Error, providerType t
// GetSchema fetches the schema for this resource provider, if any.
func (p *provider) GetSchema(version int) ([]byte, error) {
resp, err := p.clientRaw.GetSchema(p.ctx.Request(), &pulumirpc.GetSchemaRequest{
resp, err := p.clientRaw.GetSchema(p.requestContext(), &pulumirpc.GetSchemaRequest{
Version: int32(version),
})
if err != nil {
@ -175,7 +195,7 @@ func (p *provider) CheckConfig(urn resource.URN, olds,
return nil, nil, err
}
resp, err := p.clientRaw.CheckConfig(p.ctx.Request(), &pulumirpc.CheckRequest{
resp, err := p.clientRaw.CheckConfig(p.requestContext(), &pulumirpc.CheckRequest{
Urn: string(urn),
Olds: molds,
News: mnews,
@ -279,7 +299,7 @@ func (p *provider) DiffConfig(urn resource.URN, olds, news resource.PropertyMap,
return DiffResult{}, err
}
resp, err := p.clientRaw.DiffConfig(p.ctx.Request(), &pulumirpc.DiffRequest{
resp, err := p.clientRaw.DiffConfig(p.requestContext(), &pulumirpc.DiffRequest{
Urn: string(urn),
Olds: molds,
News: mnews,
@ -472,7 +492,7 @@ func (p *provider) Configure(inputs resource.PropertyMap) error {
// Spawn the configure to happen in parallel. This ensures that we remain responsive elsewhere that might
// want to make forward progress, even as the configure call is happening.
go func() {
resp, err := p.clientRaw.Configure(p.ctx.Request(), &pulumirpc.ConfigureRequest{
resp, err := p.clientRaw.Configure(p.requestContext(), &pulumirpc.ConfigureRequest{
AcceptSecrets: true,
AcceptResources: true,
Variables: config,
@ -532,7 +552,7 @@ func (p *provider) Check(urn resource.URN,
return nil, nil, err
}
resp, err := client.Check(p.ctx.Request(), &pulumirpc.CheckRequest{
resp, err := client.Check(p.requestContext(), &pulumirpc.CheckRequest{
Urn: string(urn),
Olds: molds,
News: mnews,
@ -624,7 +644,7 @@ func (p *provider) Diff(urn resource.URN, id resource.ID,
return DiffResult{}, err
}
resp, err := client.Diff(p.ctx.Request(), &pulumirpc.DiffRequest{
resp, err := client.Diff(p.requestContext(), &pulumirpc.DiffRequest{
Id: string(id),
Urn: string(urn),
Olds: molds,
@ -706,7 +726,7 @@ func (p *provider) Create(urn resource.URN, props resource.PropertyMap, timeout
var liveObject *_struct.Struct
var resourceError error
var resourceStatus = resource.StatusOK
resp, err := client.Create(p.ctx.Request(), &pulumirpc.CreateRequest{
resp, err := client.Create(p.requestContext(), &pulumirpc.CreateRequest{
Urn: string(urn),
Properties: mprops,
Timeout: timeout,
@ -731,10 +751,11 @@ func (p *provider) Create(urn resource.URN, props resource.PropertyMap, timeout
}
outs, err := UnmarshalProperties(liveObject, MarshalOptions{
Label: fmt.Sprintf("%s.outputs", label),
KeepUnknowns: preview,
KeepSecrets: true,
KeepResources: true,
Label: fmt.Sprintf("%s.outputs", label),
RejectUnknowns: !preview,
KeepUnknowns: preview,
KeepSecrets: true,
KeepResources: true,
})
if err != nil {
return "", nil, resourceStatus, err
@ -809,7 +830,7 @@ func (p *provider) Read(urn resource.URN, id resource.ID,
var liveInputs *_struct.Struct
var resourceError error
var resourceStatus = resource.StatusOK
resp, err := client.Read(p.ctx.Request(), &pulumirpc.ReadRequest{
resp, err := client.Read(p.requestContext(), &pulumirpc.ReadRequest{
Id: string(id),
Urn: string(urn),
Properties: mstate,
@ -927,7 +948,7 @@ func (p *provider) Update(urn resource.URN, id resource.ID,
var liveObject *_struct.Struct
var resourceError error
var resourceStatus = resource.StatusOK
resp, err := client.Update(p.ctx.Request(), &pulumirpc.UpdateRequest{
resp, err := client.Update(p.requestContext(), &pulumirpc.UpdateRequest{
Id: string(id),
Urn: string(urn),
Olds: molds,
@ -950,7 +971,8 @@ func (p *provider) Update(urn resource.URN, id resource.ID,
outs, err := UnmarshalProperties(liveObject, MarshalOptions{
Label: fmt.Sprintf("%s.outputs", label),
RejectUnknowns: true,
RejectUnknowns: !preview,
KeepUnknowns: preview,
KeepSecrets: true,
KeepResources: true,
})
@ -1000,7 +1022,7 @@ func (p *provider) Delete(urn resource.URN, id resource.ID, props resource.Prope
// We should only be calling {Create,Update,Delete} if the provider is fully configured.
contract.Assert(p.cfgknown)
if _, err := client.Delete(p.ctx.Request(), &pulumirpc.DeleteRequest{
if _, err := client.Delete(p.requestContext(), &pulumirpc.DeleteRequest{
Id: string(id),
Urn: string(urn),
Properties: mprops,
@ -1079,7 +1101,7 @@ func (p *provider) Construct(info ConstructInfo, typ tokens.Type, name tokens.QN
config[k.String()] = v
}
resp, err := client.Construct(p.ctx.Request(), &pulumirpc.ConstructRequest{
resp, err := client.Construct(p.requestContext(), &pulumirpc.ConstructRequest{
Project: info.Project,
Stack: info.Stack,
Config: config,
@ -1155,7 +1177,7 @@ func (p *provider) Invoke(tok tokens.ModuleMember, args resource.PropertyMap) (r
return nil, nil, err
}
resp, err := client.Invoke(p.ctx.Request(), &pulumirpc.InvokeRequest{
resp, err := client.Invoke(p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
@ -1220,7 +1242,7 @@ func (p *provider) StreamInvoke(
}
streamClient, err := client.StreamInvoke(
p.ctx.Request(), &pulumirpc.InvokeRequest{
p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
@ -1275,7 +1297,7 @@ func (p *provider) GetPluginInfo() (workspace.PluginInfo, error) {
// Calling GetPluginInfo happens immediately after loading, and does not require configuration to proceed.
// Thus, we access the clientRaw property, rather than calling getClient.
resp, err := p.clientRaw.GetPluginInfo(p.ctx.Request(), &pbempty.Empty{})
resp, err := p.clientRaw.GetPluginInfo(p.requestContext(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
@ -1291,16 +1313,21 @@ func (p *provider) GetPluginInfo() (workspace.PluginInfo, error) {
version = &sv
}
path := ""
if p.plug != nil {
path = p.plug.Bin
}
return workspace.PluginInfo{
Name: string(p.pkg),
Path: p.plug.Bin,
Path: path,
Kind: workspace.ResourcePlugin,
Version: version,
}, nil
}
func (p *provider) SignalCancellation() error {
_, err := p.clientRaw.Cancel(p.ctx.Request(), &pbempty.Empty{})
_, err := p.clientRaw.Cancel(p.requestContext(), &pbempty.Empty{})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(8).Infof("provider received rpc error `%s`: `%s`", rpcError.Code(),
@ -1317,6 +1344,9 @@ func (p *provider) SignalCancellation() error {
// Close tears down the underlying plugin RPC connection and process.
func (p *provider) Close() error {
if p.plug == nil {
return nil
}
return p.plug.Close()
}

View file

@ -0,0 +1,528 @@
// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package plugin
import (
"context"
"encoding/json"
"fmt"
pbempty "github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource"
"github.com/pulumi/pulumi/sdk/v2/go/common/resource/config"
"github.com/pulumi/pulumi/sdk/v2/go/common/tokens"
pulumirpc "github.com/pulumi/pulumi/sdk/v2/proto/go"
)
type providerServer struct {
provider Provider
keepSecrets bool
keepResources bool
}
func NewProviderServer(provider Provider) pulumirpc.ResourceProviderServer {
return &providerServer{provider: provider}
}
func (p *providerServer) unmarshalOptions(label string) MarshalOptions {
return MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
}
}
func (p *providerServer) marshalOptions(label string) MarshalOptions {
return MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: p.keepSecrets,
KeepResources: p.keepResources,
}
}
func (p *providerServer) checkNYI(method string, err error) error {
if err == ErrNotYetImplemented {
return status.Error(codes.Unimplemented, fmt.Sprintf("%v is not yet implemented", method))
}
return err
}
func (p *providerServer) marshalDiff(diff DiffResult) (*pulumirpc.DiffResponse, error) {
changes := pulumirpc.DiffResponse_DIFF_UNKNOWN
switch diff.Changes {
case DiffNone:
changes = pulumirpc.DiffResponse_DIFF_NONE
case DiffSome:
changes = pulumirpc.DiffResponse_DIFF_SOME
}
// Infer the result from the detailed diff.
var diffs, replaces []string
var detailedDiff map[string]*pulumirpc.PropertyDiff
if len(diff.DetailedDiff) == 0 {
diffs = make([]string, len(diff.ChangedKeys))
for i, k := range diff.ChangedKeys {
diffs[i] = string(k)
}
replaces = make([]string, len(diff.ReplaceKeys))
for i, k := range diff.ReplaceKeys {
replaces[i] = string(k)
}
} else {
changes = pulumirpc.DiffResponse_DIFF_SOME
detailedDiff = make(map[string]*pulumirpc.PropertyDiff)
for path, diff := range diff.DetailedDiff {
diffs = append(diffs, path)
var kind pulumirpc.PropertyDiff_Kind
switch diff.Kind {
case DiffAdd:
kind = pulumirpc.PropertyDiff_ADD
case DiffAddReplace:
kind, replaces = pulumirpc.PropertyDiff_ADD_REPLACE, append(replaces, path)
case DiffDelete:
kind = pulumirpc.PropertyDiff_DELETE
case DiffDeleteReplace:
kind, replaces = pulumirpc.PropertyDiff_DELETE, append(replaces, path)
case DiffUpdate:
kind = pulumirpc.PropertyDiff_UPDATE
case DiffUpdateReplace:
kind, replaces = pulumirpc.PropertyDiff_UPDATE_REPLACE, append(replaces, path)
}
detailedDiff[path] = &pulumirpc.PropertyDiff{
Kind: kind,
InputDiff: diff.InputDiff,
}
}
}
return &pulumirpc.DiffResponse{
Replaces: replaces,
DeleteBeforeReplace: diff.DeleteBeforeReplace,
Changes: changes,
Diffs: diffs,
DetailedDiff: detailedDiff,
}, nil
}
func (p *providerServer) GetSchema(ctx context.Context,
req *pulumirpc.GetSchemaRequest) (*pulumirpc.GetSchemaResponse, error) {
schema, err := p.provider.GetSchema(int(req.GetVersion()))
if err != nil {
return nil, err
}
return &pulumirpc.GetSchemaResponse{Schema: string(schema)}, nil
}
func (p *providerServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty) (*pulumirpc.PluginInfo, error) {
info, err := p.provider.GetPluginInfo()
if err != nil {
return nil, err
}
return &pulumirpc.PluginInfo{Version: info.Version.String()}, nil
}
func (p *providerServer) Cancel(ctx context.Context, req *pbempty.Empty) (*pbempty.Empty, error) {
if err := p.provider.SignalCancellation(); err != nil {
return nil, err
}
return &pbempty.Empty{}, nil
}
func (p *providerServer) CheckConfig(ctx context.Context,
req *pulumirpc.CheckRequest) (*pulumirpc.CheckResponse, error) {
urn := resource.URN(req.GetUrn())
state, err := UnmarshalProperties(req.GetOlds(), p.unmarshalOptions("olds"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetNews(), p.unmarshalOptions("news"))
if err != nil {
return nil, err
}
newInputs, failures, err := p.provider.CheckConfig(urn, state, inputs, true)
if err != nil {
return nil, p.checkNYI("CheckConfig", err)
}
rpcInputs, err := MarshalProperties(newInputs, p.marshalOptions("inputs"))
if err != nil {
return nil, err
}
rpcFailures := make([]*pulumirpc.CheckFailure, len(failures))
for i, f := range failures {
rpcFailures[i] = &pulumirpc.CheckFailure{Property: string(f.Property), Reason: f.Reason}
}
return &pulumirpc.CheckResponse{Inputs: rpcInputs, Failures: rpcFailures}, nil
}
func (p *providerServer) DiffConfig(ctx context.Context, req *pulumirpc.DiffRequest) (*pulumirpc.DiffResponse, error) {
urn := resource.URN(req.GetUrn())
state, err := UnmarshalProperties(req.GetOlds(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetNews(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
diff, err := p.provider.DiffConfig(urn, state, inputs, true, req.GetIgnoreChanges())
if err != nil {
return nil, p.checkNYI("DiffConfig", err)
}
return p.marshalDiff(diff)
}
func (p *providerServer) Configure(ctx context.Context,
req *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) {
var inputs resource.PropertyMap
if req.GetArgs() != nil {
args, err := UnmarshalProperties(req.GetArgs(), p.unmarshalOptions("args"))
if err != nil {
return nil, err
}
inputs = args
} else {
for k, v := range req.GetVariables() {
key, err := config.ParseKey(k)
if err != nil {
return nil, err
}
var value interface{}
if err = json.Unmarshal([]byte(v), &value); err != nil {
// If we couldn't unmarshal a JSON value, just pass the raw string through.
value = v
}
inputs[resource.PropertyKey(key.Name())] = resource.NewPropertyValue(value)
}
}
if err := p.provider.Configure(inputs); err != nil {
return nil, err
}
p.keepSecrets = req.GetAcceptSecrets()
p.keepResources = req.GetAcceptResources()
return &pulumirpc.ConfigureResponse{AcceptSecrets: true, SupportsPreview: true, AcceptResources: true}, nil
}
func (p *providerServer) Check(ctx context.Context, req *pulumirpc.CheckRequest) (*pulumirpc.CheckResponse, error) {
urn := resource.URN(req.GetUrn())
state, err := UnmarshalProperties(req.GetOlds(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetNews(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
newInputs, failures, err := p.provider.Check(urn, state, inputs, true)
if err != nil {
return nil, err
}
rpcInputs, err := MarshalProperties(newInputs, p.marshalOptions("newInputs"))
if err != nil {
return nil, err
}
rpcFailures := make([]*pulumirpc.CheckFailure, len(failures))
for i, f := range failures {
rpcFailures[i] = &pulumirpc.CheckFailure{Property: string(f.Property), Reason: f.Reason}
}
return &pulumirpc.CheckResponse{Inputs: rpcInputs, Failures: rpcFailures}, nil
}
func (p *providerServer) Diff(ctx context.Context, req *pulumirpc.DiffRequest) (*pulumirpc.DiffResponse, error) {
urn, id := resource.URN(req.GetUrn()), resource.ID(req.GetId())
state, err := UnmarshalProperties(req.GetOlds(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetNews(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
diff, err := p.provider.Diff(urn, id, state, inputs, true, req.GetIgnoreChanges())
if err != nil {
return nil, err
}
return p.marshalDiff(diff)
}
func (p *providerServer) Create(ctx context.Context, req *pulumirpc.CreateRequest) (*pulumirpc.CreateResponse, error) {
urn := resource.URN(req.GetUrn())
inputs, err := UnmarshalProperties(req.GetProperties(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
id, state, _, err := p.provider.Create(urn, inputs, req.GetTimeout(), req.GetPreview())
if err != nil {
return nil, err
}
rpcState, err := MarshalProperties(state, p.marshalOptions("newState"))
if err != nil {
return nil, err
}
return &pulumirpc.CreateResponse{
Id: string(id),
Properties: rpcState,
}, nil
}
func (p *providerServer) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*pulumirpc.ReadResponse, error) {
urn, id := resource.URN(req.GetUrn()), resource.ID(req.GetId())
state, err := UnmarshalProperties(req.GetProperties(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetInputs(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
result, _, err := p.provider.Read(urn, id, inputs, state)
if err != nil {
return nil, err
}
rpcState, err := MarshalProperties(result.Outputs, p.marshalOptions("newState"))
if err != nil {
return nil, err
}
rpcInputs, err := MarshalProperties(result.Inputs, p.marshalOptions("newInputs"))
if err != nil {
return nil, err
}
return &pulumirpc.ReadResponse{
Id: string(id),
Properties: rpcState,
Inputs: rpcInputs,
}, nil
}
func (p *providerServer) Update(ctx context.Context, req *pulumirpc.UpdateRequest) (*pulumirpc.UpdateResponse, error) {
urn, id := resource.URN(req.GetUrn()), resource.ID(req.GetId())
state, err := UnmarshalProperties(req.GetOlds(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
inputs, err := UnmarshalProperties(req.GetNews(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
newState, _, err := p.provider.Update(urn, id, state, inputs, req.GetTimeout(), req.GetIgnoreChanges(),
req.GetPreview())
if err != nil {
return nil, err
}
rpcState, err := MarshalProperties(newState, p.marshalOptions("newState"))
if err != nil {
return nil, err
}
return &pulumirpc.UpdateResponse{Properties: rpcState}, nil
}
func (p *providerServer) Delete(ctx context.Context, req *pulumirpc.DeleteRequest) (*pbempty.Empty, error) {
urn, id := resource.URN(req.GetUrn()), resource.ID(req.GetId())
state, err := UnmarshalProperties(req.GetProperties(), p.unmarshalOptions("state"))
if err != nil {
return nil, err
}
if _, err = p.provider.Delete(urn, id, state, req.GetTimeout()); err != nil {
return nil, err
}
return &pbempty.Empty{}, nil
}
func (p *providerServer) Construct(ctx context.Context,
req *pulumirpc.ConstructRequest) (*pulumirpc.ConstructResponse, error) {
typ, name, parent := tokens.Type(req.GetType()), tokens.QName(req.GetName()), resource.URN(req.GetParent())
inputs, err := UnmarshalProperties(req.GetInputs(), p.unmarshalOptions("inputs"))
if err != nil {
return nil, err
}
cfg := map[config.Key]string{}
for k, v := range req.GetConfig() {
configKey, err := config.ParseKey(k)
if err != nil {
return nil, err
}
cfg[configKey] = v
}
info := ConstructInfo{
Project: req.GetProject(),
Stack: req.GetStack(),
Config: cfg,
DryRun: req.GetDryRun(),
Parallel: int(req.GetParallel()),
MonitorAddress: req.GetMonitorEndpoint(),
}
aliases := make([]resource.URN, len(req.GetAliases()))
for i, urn := range req.GetAliases() {
aliases[i] = resource.URN(urn)
}
dependencies := make([]resource.URN, len(req.GetDependencies()))
for i, urn := range req.GetAliases() {
dependencies[i] = resource.URN(urn)
}
propertyDependencies := map[resource.PropertyKey][]resource.URN{}
for name, deps := range req.GetInputDependencies() {
urns := make([]resource.URN, len(deps.Urns))
for i, urn := range deps.Urns {
urns[i] = resource.URN(urn)
}
propertyDependencies[resource.PropertyKey(name)] = urns
}
options := ConstructOptions{
Aliases: aliases,
Dependencies: dependencies,
Protect: req.GetProtect(),
Providers: req.GetProviders(),
PropertyDependencies: propertyDependencies,
}
result, err := p.provider.Construct(info, typ, name, parent, inputs, options)
if err != nil {
return nil, err
}
outputs, err := MarshalProperties(result.Outputs, p.marshalOptions("outputs"))
if err != nil {
return nil, err
}
outputDependencies := map[string]*pulumirpc.ConstructResponse_PropertyDependencies{}
for name, deps := range result.OutputDependencies {
urns := make([]string, len(deps))
for i, urn := range deps {
urns[i] = string(urn)
}
outputDependencies[string(name)] = &pulumirpc.ConstructResponse_PropertyDependencies{Urns: urns}
}
return &pulumirpc.ConstructResponse{
Urn: string(result.URN),
State: outputs,
StateDependencies: outputDependencies,
}, nil
}
func (p *providerServer) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
args, err := UnmarshalProperties(req.GetArgs(), p.unmarshalOptions("args"))
if err != nil {
return nil, err
}
result, failures, err := p.provider.Invoke(tokens.ModuleMember(req.GetTok()), args)
if err != nil {
return nil, err
}
rpcResult, err := MarshalProperties(result, p.marshalOptions("result"))
if err != nil {
return nil, err
}
rpcFailures := make([]*pulumirpc.CheckFailure, len(failures))
for i, f := range failures {
rpcFailures[i] = &pulumirpc.CheckFailure{Property: string(f.Property), Reason: f.Reason}
}
return &pulumirpc.InvokeResponse{
Return: rpcResult,
Failures: rpcFailures,
}, nil
}
func (p *providerServer) StreamInvoke(req *pulumirpc.InvokeRequest,
server pulumirpc.ResourceProvider_StreamInvokeServer) error {
args, err := UnmarshalProperties(req.GetArgs(), p.unmarshalOptions("args"))
if err != nil {
return err
}
failures, err := p.provider.StreamInvoke(tokens.ModuleMember(req.GetTok()), args,
func(item resource.PropertyMap) error {
rpcItem, err := MarshalProperties(item, p.marshalOptions("item"))
if err != nil {
return err
}
return server.Send(&pulumirpc.InvokeResponse{Return: rpcItem})
})
if err != nil {
return err
}
if len(failures) == 0 {
return nil
}
rpcFailures := make([]*pulumirpc.CheckFailure, len(failures))
for i, f := range failures {
rpcFailures[i] = &pulumirpc.CheckFailure{Property: string(f.Property), Reason: f.Reason}
}
return server.Send(&pulumirpc.InvokeResponse{Failures: rpcFailures})
}

View file

@ -286,9 +286,8 @@ func UnmarshalPropertyValue(v *structpb.Value, opts MarshalOptions) (*resource.P
m := resource.NewStringProperty(s)
return &m, nil
case *structpb.Value_ListValue:
// If there's already an array, prefer to swap elements within it.
var elems []resource.PropertyValue
lst := v.GetListValue()
elems := make([]resource.PropertyValue, len(lst.GetValues()))
for i, elem := range lst.GetValues() {
e, err := UnmarshalPropertyValue(elem, opts)
if err != nil {