pulumi/pkg/resource/deploy/deploytest/pluginhost.go
Pat Gavlin 24f89e1121
Close plugin context on plan creation failure (#2304)
This ensures that the gRPC server is properly shut down. This fixes an
issue in which a resource plugin that is still configuring could report
log messages to the plugin host, which would in turn attempt to send
diagnostic packets over a closed channel, causing a panic.

Fixes #2170.
2018-12-18 13:25:52 -08:00

179 lines
4.4 KiB
Go

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deploytest
import (
"sync"
"github.com/blang/semver"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/plugin"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/workspace"
)
type LoadProviderFunc func() (plugin.Provider, error)
type LoadProviderWithHostFunc func(host plugin.Host) (plugin.Provider, error)
type ProviderLoader struct {
pkg tokens.Package
version semver.Version
load LoadProviderFunc
loadWithHost LoadProviderWithHostFunc
}
func NewProviderLoader(pkg tokens.Package, version semver.Version, load LoadProviderFunc) *ProviderLoader {
return &ProviderLoader{
pkg: pkg,
version: version,
load: load,
}
}
func NewProviderLoaderWithHost(pkg tokens.Package, version semver.Version,
load LoadProviderWithHostFunc) *ProviderLoader {
return &ProviderLoader{
pkg: pkg,
version: version,
loadWithHost: load,
}
}
type pluginHost struct {
providerLoaders []*ProviderLoader
languageRuntime plugin.LanguageRuntime
sink diag.Sink
statusSink diag.Sink
providers map[plugin.Provider]struct{}
closed bool
m sync.Mutex
}
func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRuntime,
providerLoaders ...*ProviderLoader) plugin.Host {
return &pluginHost{
providerLoaders: providerLoaders,
languageRuntime: languageRuntime,
sink: sink,
statusSink: statusSink,
providers: make(map[plugin.Provider]struct{}),
}
}
func (host *pluginHost) isClosed() bool {
host.m.Lock()
defer host.m.Unlock()
return host.closed
}
func (host *pluginHost) Provider(pkg tokens.Package, version *semver.Version) (plugin.Provider, error) {
var best *ProviderLoader
for _, l := range host.providerLoaders {
if l.pkg != pkg {
continue
}
if version != nil && l.version.LT(*version) {
continue
}
if best == nil || l.version.GT(best.version) {
best = l
}
}
if best == nil {
return nil, nil
}
load := best.load
if load == nil {
load = func() (plugin.Provider, error) {
return best.loadWithHost(host)
}
}
prov, err := load()
if err != nil {
return nil, err
}
host.m.Lock()
defer host.m.Unlock()
host.providers[prov] = struct{}{}
return prov, nil
}
func (host *pluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}
func (host *pluginHost) SignalCancellation() error {
host.m.Lock()
defer host.m.Unlock()
var err error
for prov := range host.providers {
if pErr := prov.SignalCancellation(); pErr != nil {
err = pErr
}
}
return err
}
func (host *pluginHost) Close() error {
host.m.Lock()
defer host.m.Unlock()
host.closed = true
return nil
}
func (host *pluginHost) ServerAddr() string {
panic("Host RPC address not available")
}
func (host *pluginHost) Log(sev diag.Severity, urn resource.URN, msg string, streamID int32) {
if !host.isClosed() {
host.sink.Logf(sev, diag.StreamMessage(urn, msg, streamID))
}
}
func (host *pluginHost) LogStatus(sev diag.Severity, urn resource.URN, msg string, streamID int32) {
if !host.isClosed() {
host.statusSink.Logf(sev, diag.StreamMessage(urn, msg, streamID))
}
}
func (host *pluginHost) Analyzer(nm tokens.QName) (plugin.Analyzer, error) {
return nil, errors.New("unsupported")
}
func (host *pluginHost) CloseProvider(provider plugin.Provider) error {
host.m.Lock()
defer host.m.Unlock()
delete(host.providers, provider)
return nil
}
func (host *pluginHost) ListPlugins() []workspace.PluginInfo {
return nil
}
func (host *pluginHost) EnsurePlugins(plugins []workspace.PluginInfo, kinds plugin.Flags) error {
return nil
}
func (host *pluginHost) GetRequiredPlugins(info plugin.ProgInfo,
kinds plugin.Flags) ([]workspace.PluginInfo, error) {
return nil, nil
}