pulumi/pkg/resource/plugin/host.go
Pat Gavlin a222705143
Implement first-class providers. (#1695)
### First-Class Providers
These changes implement support for first-class providers. First-class
providers are provider plugins that are exposed as resources via the
Pulumi programming model so that they may be explicitly and multiply
instantiated. Each instance of a provider resource may be configured
differently, and configuration parameters may be source from the
outputs of other resources.

### Provider Plugin Changes
In order to accommodate the need to verify and diff provider
configuration and configure providers without complete configuration
information, these changes adjust the high-level provider plugin
interface. Two new methods for validating a provider's configuration
and diffing changes to the same have been added (`CheckConfig` and
`DiffConfig`, respectively), and the type of the configuration bag
accepted by `Configure` has been changed to a `PropertyMap`.

These changes have not yet been reflected in the provider plugin gRPC
interface. We will do this in a set of follow-up changes. Until then,
these methods are implemented by adapters:
- `CheckConfig` validates that all configuration parameters are string
  or unknown properties. This is necessary because existing plugins
  only accept string-typed configuration values.
- `DiffConfig` either returns "never replace" if all configuration
  values are known or "must replace" if any configuration value is
  unknown. The justification for this behavior is given
  [here](https://github.com/pulumi/pulumi/pull/1695/files#diff-a6cd5c7f337665f5bb22e92ca5f07537R106)
- `Configure` converts the config bag to a legacy config map and
  configures the provider plugin if all config values are known. If any
  config value is unknown, the underlying plugin is not configured and
  the provider may only perform `Check`, `Read`, and `Invoke`, all of
  which return empty results. We justify this behavior becuase it is
  only possible during a preview and provides the best experience we
  can manage with the existing gRPC interface.

### Resource Model Changes
Providers are now exposed as resources that participate in a stack's
dependency graph. Like other resources, they are explicitly created,
may have multiple instances, and may have dependencies on other
resources. Providers are referred to using provider references, which
are a combination of the provider's URN and its ID. This design
addresses the need during a preview to refer to providers that have not
yet been physically created and therefore have no ID.

All custom resources that are not themselves providers must specify a
single provider via a provider reference. The named provider will be
used to manage that resource's CRUD operations. If a resource's
provider reference changes, the resource must be replaced. Though its
URN is not present in the resource's dependency list, the provider
should be treated as a dependency of the resource when topologically
sorting the dependency graph.

Finally, `Invoke` operations must now specify a provider to use for the
invocation via a provider reference.

### Engine Changes
First-class providers support requires a few changes to the engine:
- The engine must have some way to map from provider references to
  provider plugins. It must be possible to add providers from a stack's
  checkpoint to this map and to register new/updated providers during
  the execution of a plan in response to CRUD operations on provider
  resources.
- In order to support updating existing stacks using existing Pulumi
  programs that may not explicitly instantiate providers, the engine
  must be able to manage the "default" providers for each package
  referenced by a checkpoint or Pulumi program. The configuration for
  a "default" provider is taken from the stack's configuration data.

The former need is addressed by adding a provider registry type that is
responsible for managing all of the plugins required by a plan. In
addition to loading plugins froma checkpoint and providing the ability
to map from a provider reference to a provider plugin, this type serves
as the provider plugin for providers themselves (i.e. it is the
"provider provider").

The latter need is solved via two relatively self-contained changes to
plan setup and the eval source.

During plan setup, the old checkpoint is scanned for custom resources
that do not have a provider reference in order to compute the set of
packages that require a default provider. Once this set has been
computed, the required default provider definitions are conjured and
prepended to the checkpoint's resource list. Each resource that
requires a default provider is then updated to refer to the default
provider for its package.

While an eval source is running, each custom resource registration,
resource read, and invoke that does not name a provider is trapped
before being returned by the source iterator. If no default provider
for the appropriate package has been registered, the eval source
synthesizes an appropriate registration, waits for it to complete, and
records the registered provider's reference. This reference is injected
into the original request, which is then processed as usual. If a
default provider was already registered, the recorded reference is
used and no new registration occurs.

### SDK Changes
These changes only expose first-class providers from the Node.JS SDK.
- A new abstract class, `ProviderResource`, can be subclassed and used
  to instantiate first-class providers.
- A new field in `ResourceOptions`, `provider`, can be used to supply
  a particular provider instance to manage a `CustomResource`'s CRUD
  operations.
- A new type, `InvokeOptions`, can be used to specify options that
  control the behavior of a call to `pulumi.runtime.invoke`. This type
  includes a `provider` field that is analogous to
  `ResourceOptions.provider`.
2018-08-06 17:50:29 -07:00

449 lines
16 KiB
Go

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package plugin
import (
"os"
"github.com/blang/semver"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/cmdutil"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pulumi/pulumi/pkg/util/logging"
"github.com/pulumi/pulumi/pkg/workspace"
)
// A Host hosts provider plugins and makes them easily accessible by package name.
type Host interface {
// ServerAddr returns the address at which the host's RPC interface may be found.
ServerAddr() string
// Log logs a message, including errors and warnings. Messages can have a resource URN
// associated with them. If no urn is provided, the message is global.
Log(sev diag.Severity, urn resource.URN, msg string, streamID int32)
// Analyzer fetches the analyzer with a given name, possibly lazily allocating the plugins for it. If an analyzer
// could not be found, or an error occurred while creating it, a non-nil error is returned.
Analyzer(nm tokens.QName) (Analyzer, error)
// Provider loads a new copy of the provider for a given package. If a provider for this package could not be
// found, or an error occurs while creating it, a non-nil error is returned.
Provider(pkg tokens.Package, version *semver.Version) (Provider, error)
// CloseProvider closes the given provider plugin and deregisters it from this host.
CloseProvider(provider Provider) error
// LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If
// an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned.
LanguageRuntime(runtime string) (LanguageRuntime, error)
// ListPlugins lists all plugins that have been loaded, with version information.
ListPlugins() []workspace.PluginInfo
// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
EnsurePlugins(plugins []workspace.PluginInfo, kinds Flags) error
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error)
// SignalCancellation asks all resource providers to gracefully shut down and abort any ongoing
// operations. Operation aborted in this way will return an error (e.g., `Update` and `Create`
// will either a creation error or an initialization error. SignalCancellation is advisory and
// non-blocking; it is up to the host to decide how long to wait after SignalCancellation is
// called before (e.g.) hard-closing any gRPC connection.
SignalCancellation() error
// Close reclaims any resources associated with the host.
Close() error
}
// Events provides higher-level consumers of the plugin model to attach callbacks on
// plugin load events.
type Events interface {
// OnPluginLoad is fired by the plugin host whenever a new plugin is successfully loaded.
// newPlugin is the plugin that was loaded.
OnPluginLoad(newPlugin workspace.PluginInfo) error
}
// NewDefaultHost implements the standard plugin logic, using the standard installation root to find them.
func NewDefaultHost(ctx *Context, config ConfigSource, events Events,
runtimeOptions map[string]interface{}) (Host, error) {
host := &defaultHost{
ctx: ctx,
config: config,
events: events,
runtimeOptions: runtimeOptions,
analyzerPlugins: make(map[tokens.QName]*analyzerPlugin),
languagePlugins: make(map[string]*languagePlugin),
resourcePlugins: make(map[Provider]*resourcePlugin),
reportedResourcePlugins: make(map[string]struct{}),
loadRequests: make(chan pluginLoadRequest),
}
// Fire up a gRPC server to listen for requests. This acts as a RPC interface that plugins can use
// to "phone home" in case there are things the host must do on behalf of the plugins (like log, etc).
svr, err := newHostServer(host, ctx)
if err != nil {
return nil, err
}
host.server = svr
// Start a goroutine we'll use to satisfy load requests serially and avoid race conditions.
go func() {
for req := range host.loadRequests {
req.result <- req.load()
}
}()
return host, nil
}
type pluginLoadRequest struct {
load func() error
result chan<- error
}
type defaultHost struct {
ctx *Context // the shared context for this host.
config ConfigSource // the source for provider configuration parameters.
events Events // optional callbacks for plugin load events
runtimeOptions map[string]interface{} // options to pass to the language plugins.
analyzerPlugins map[tokens.QName]*analyzerPlugin // a cache of analyzer plugins and their processes.
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
resourcePlugins map[Provider]*resourcePlugin // the set of loaded resource plugins.
reportedResourcePlugins map[string]struct{} // the set of unique resource plugins we'll report.
plugins []workspace.PluginInfo // a list of plugins allocated by this host.
loadRequests chan pluginLoadRequest // a channel used to satisfy plugin load requests.
server *hostServer // the server's RPC machinery.
}
var _ Host = (*defaultHost)(nil)
type analyzerPlugin struct {
Plugin Analyzer
Info workspace.PluginInfo
}
type languagePlugin struct {
Plugin LanguageRuntime
Info workspace.PluginInfo
}
type resourcePlugin struct {
Plugin Provider
Info workspace.PluginInfo
}
func (host *defaultHost) ServerAddr() string {
return host.server.Address()
}
func (host *defaultHost) Log(sev diag.Severity, urn resource.URN, msg string, streamID int32) {
host.ctx.Diag.Logf(sev, diag.StreamMessage(urn, msg, streamID))
}
// loadPlugin sends an appropriate load request to the plugin loader and returns the loaded plugin (if any) and error.
func (host *defaultHost) loadPlugin(load func() (interface{}, error)) (interface{}, error) {
var plugin interface{}
result := make(chan error)
host.loadRequests <- pluginLoadRequest{
load: func() error {
p, err := load()
plugin = p
return err
},
result: result,
}
return plugin, <-result
}
func (host *defaultHost) Analyzer(name tokens.QName) (Analyzer, error) {
plugin, err := host.loadPlugin(func() (interface{}, error) {
// First see if we already loaded this plugin.
if plug, has := host.analyzerPlugins[name]; has {
contract.Assert(plug != nil)
return plug.Plugin, nil
}
// If not, try to load and bind to a plugin.
plug, err := NewAnalyzer(host, host.ctx, name)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}
// Memoize the result.
host.plugins = append(host.plugins, info)
host.analyzerPlugins[name] = &analyzerPlugin{Plugin: plug, Info: info}
if host.events != nil {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err
})
if plugin == nil || err != nil {
return nil, err
}
return plugin.(Analyzer), nil
}
func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (Provider, error) {
plugin, err := host.loadPlugin(func() (interface{}, error) {
// Try to load and bind to a plugin.
plug, err := NewProvider(host, host.ctx, pkg, version)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}
// Warn if the plugin version was not what we expected
if version != nil && !cmdutil.IsTruthy(os.Getenv("PULUMI_DEV")) {
if info.Version == nil || !info.Version.GTE(*version) {
var v string
if info.Version != nil {
v = info.Version.String()
}
host.ctx.Diag.Warningf(
diag.Message("", /*urn*/
"resource plugin %s is expected to have version >=%s, but has %s; "+
"the wrong version may be on your path, or this may be a bug in the plugin"),
info.Name, version.String(), v)
}
}
// Record the result and add the plugin's info to our list of loaded plugins if it's the first copy of its
// kind.
key := info.Name
if info.Version != nil {
key += info.Version.String()
}
_, alreadyReported := host.reportedResourcePlugins[key]
if !alreadyReported {
host.reportedResourcePlugins[key] = struct{}{}
host.plugins = append(host.plugins, info)
}
host.resourcePlugins[plug] = &resourcePlugin{Plugin: plug, Info: info}
if host.events != nil && !alreadyReported {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err
})
if plugin == nil || err != nil {
return nil, err
}
return plugin.(Provider), nil
}
func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) {
plugin, err := host.loadPlugin(func() (interface{}, error) {
// First see if we already loaded this plugin.
if plug, has := host.languagePlugins[runtime]; has {
contract.Assert(plug != nil)
return plug.Plugin, nil
}
// If not, allocate a new one.
plug, err := NewLanguageRuntime(host, host.ctx, runtime, host.runtimeOptions)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}
// Memoize the result.
host.plugins = append(host.plugins, info)
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
if host.events != nil {
if eventerr := host.events.OnPluginLoad(info); eventerr != nil {
return nil, errors.Wrapf(eventerr, "failed to perform plugin load callback")
}
}
}
return plug, err
})
if plugin == nil || err != nil {
return nil, err
}
return plugin.(LanguageRuntime), nil
}
func (host *defaultHost) ListPlugins() []workspace.PluginInfo {
return host.plugins
}
// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginInfo, kinds Flags) error {
// Use a multieerror to track failures so we can return one big list of all failures at the end.
var result error
for _, plugin := range plugins {
switch plugin.Kind {
case workspace.AnalyzerPlugin:
if kinds&AnalyzerPlugins != 0 {
if _, err := host.Analyzer(tokens.QName(plugin.Name)); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load analyzer plugin %s", plugin.Name))
}
}
case workspace.LanguagePlugin:
if kinds&LanguagePlugins != 0 {
if _, err := host.LanguageRuntime(plugin.Name); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load language plugin %s", plugin.Name))
}
}
case workspace.ResourcePlugin:
if kinds&ResourcePlugins != 0 {
if _, err := host.Provider(tokens.Package(plugin.Name), plugin.Version); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load resource plugin %s", plugin.Name))
}
}
default:
contract.Failf("unexpected plugin kind: %s", plugin.Kind)
}
}
return result
}
// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func (host *defaultHost) GetRequiredPlugins(info ProgInfo, kinds Flags) ([]workspace.PluginInfo, error) {
var plugins []workspace.PluginInfo
if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.RuntimeInfo.Name())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.RuntimeInfo.Name())
}
plugins = append(plugins, workspace.PluginInfo{
Name: info.Proj.RuntimeInfo.Name(),
Kind: workspace.LanguagePlugin,
})
if kinds&ResourcePlugins != 0 {
// Use the language plugin to compute this project's set of plugin dependencies.
// TODO: we want to support loading precisely what the project needs, rather than doing a static scan of resolved
// packages. Doing this requires that we change our RPC interface and figure out how to configure plugins
// later than we do (right now, we do it up front, but at that point we don't know the version).
deps, err := lang.GetRequiredPlugins(info)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover plugin requirements")
}
plugins = append(plugins, deps...)
}
} else {
// If we can't load the language plugin, we can't discover the resource plugins.
contract.Assertf(kinds&ResourcePlugins != 0,
"cannot load resource plugins without also loading the language plugin")
}
// Next, if there are analyzers listed in the project file, use them too.
// TODO: these are currently not versioned. We probably need to let folks specify versions in Pulumi.yaml.
if info.Proj.Analyzers != nil && kinds&AnalyzerPlugins != 0 {
for _, analyzer := range *info.Proj.Analyzers {
plugins = append(plugins, workspace.PluginInfo{
Name: string(analyzer),
Kind: workspace.AnalyzerPlugin,
})
}
}
return plugins, nil
}
func (host *defaultHost) SignalCancellation() error {
var result error
for _, plug := range host.resourcePlugins {
if err := plug.Plugin.SignalCancellation(); err != nil {
result = multierror.Append(result, errors.Wrapf(err,
"Error signaling cancellation to resource provider '%s'", plug.Info.Name))
}
}
return result
}
func (host *defaultHost) CloseProvider(provider Provider) error {
// NOTE: we're abusing loadPlugin in order to ensure proper synchronization.
_, err := host.loadPlugin(func() (interface{}, error) {
if err := provider.Close(); err != nil {
return nil, err
}
delete(host.resourcePlugins, provider)
return nil, nil
})
return err
}
func (host *defaultHost) Close() error {
// Close all plugins.
for _, plug := range host.analyzerPlugins {
if err := plug.Plugin.Close(); err != nil {
logging.Infof("Error closing '%s' analyzer plugin during shutdown; ignoring: %v", plug.Info.Name, err)
}
}
for _, plug := range host.resourcePlugins {
if err := plug.Plugin.Close(); err != nil {
logging.Infof("Error closing '%s' resource plugin during shutdown; ignoring: %v", plug.Info.Name, err)
}
}
for _, plug := range host.languagePlugins {
if err := plug.Plugin.Close(); err != nil {
logging.Infof("Error closing '%s' language plugin during shutdown; ignoring: %v", plug.Info.Name, err)
}
}
// Empty out all maps.
host.analyzerPlugins = make(map[tokens.QName]*analyzerPlugin)
host.languagePlugins = make(map[string]*languagePlugin)
host.resourcePlugins = make(map[Provider]*resourcePlugin)
// Shut down the plugin loader.
close(host.loadRequests)
// Finally, shut down the host's gRPC server.
return host.server.Cancel()
}
// Flags can be used to filter out plugins during loading that aren't necessary.
type Flags int
const (
// AnalyzerPlugins is used to only load analyzers.
AnalyzerPlugins Flags = 1 << iota
// LanguagePlugins is used to only load language plugins.
LanguagePlugins
// ResourcePlugins is used to only load resource provider plugins.
ResourcePlugins
)
// AllPlugins uses flags to ensure that all plugin kinds are loaded.
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins