Serialize plugin loads.
As it stands, we allow plugin load requests to race. Not only does this create a situation in which we may load and then immediately throw away a plugin (potentially leaking its process), it also creates the possibility of races when reading from/writing to the various plugin caches. These changes serialize all plugin loads and cache accesses by running all accesses for a particular host in a single goroutine. Fixes #1020.
This commit is contained in:
parent
52b7bf72ff
commit
dc36b9569a
|
@ -53,6 +53,7 @@ func NewDefaultHost(ctx *Context, config ConfigSource) (Host, error) {
|
|||
analyzerPlugins: make(map[tokens.QName]*analyzerPlugin),
|
||||
languagePlugins: make(map[string]*languagePlugin),
|
||||
resourcePlugins: make(map[tokens.Package]*resourcePlugin),
|
||||
loadRequests: make(chan pluginLoadRequest),
|
||||
}
|
||||
|
||||
// Fire up a gRPC server to listen for requests. This acts as a RPC interface that plugins can use
|
||||
|
@ -63,9 +64,21 @@ func NewDefaultHost(ctx *Context, config ConfigSource) (Host, error) {
|
|||
}
|
||||
host.server = svr
|
||||
|
||||
// Start a goroutine we'll use to satisfy load requests serially and avoid race conditions.
|
||||
go func() {
|
||||
for req := range host.loadRequests {
|
||||
req.result <- req.load()
|
||||
}
|
||||
}()
|
||||
|
||||
return host, nil
|
||||
}
|
||||
|
||||
type pluginLoadRequest struct {
|
||||
load func() error
|
||||
result chan<- error
|
||||
}
|
||||
|
||||
type defaultHost struct {
|
||||
ctx *Context // the shared context for this host.
|
||||
config ConfigSource // the source for provider configuration parameters.
|
||||
|
@ -73,6 +86,7 @@ type defaultHost struct {
|
|||
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
|
||||
resourcePlugins map[tokens.Package]*resourcePlugin // a cache of resource plugins and their processes.
|
||||
plugins []workspace.PluginInfo // a list of plugins allocated by this host.
|
||||
loadRequests chan pluginLoadRequest // a channel used to satisfy plugin load requests.
|
||||
server *hostServer // the server's RPC machinery.
|
||||
}
|
||||
|
||||
|
@ -99,114 +113,148 @@ func (host *defaultHost) Log(sev diag.Severity, msg string) {
|
|||
host.ctx.Diag.Logf(sev, diag.RawMessage(msg))
|
||||
}
|
||||
|
||||
func (host *defaultHost) Analyzer(name tokens.QName) (Analyzer, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.analyzerPlugins[name]; has {
|
||||
contract.Assert(plug != nil)
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
// loadPlugin sends an appropriate load request to the plugin loader and returns the loaded plugin (if any) and error.
|
||||
func (host *defaultHost) loadPlugin(load func() (interface{}, error)) (interface{}, error) {
|
||||
var plugin interface{}
|
||||
|
||||
// If not, try to load and bind to a plugin.
|
||||
plug, err := NewAnalyzer(host, host.ctx, name)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
result := make(chan error)
|
||||
host.loadRequests <- pluginLoadRequest{
|
||||
load: func() error {
|
||||
p, err := load()
|
||||
plugin = p
|
||||
return err
|
||||
},
|
||||
result: result,
|
||||
}
|
||||
return plugin, <-result
|
||||
}
|
||||
|
||||
func (host *defaultHost) Analyzer(name tokens.QName) (Analyzer, error) {
|
||||
plugin, err := host.loadPlugin(func() (interface{}, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.analyzerPlugins[name]; has {
|
||||
contract.Assert(plug != nil)
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.analyzerPlugins[name] = &analyzerPlugin{Plugin: plug, Info: info}
|
||||
}
|
||||
// If not, try to load and bind to a plugin.
|
||||
plug, err := NewAnalyzer(host, host.ctx, name)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
}
|
||||
|
||||
return plug, err
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.analyzerPlugins[name] = &analyzerPlugin{Plugin: plug, Info: info}
|
||||
}
|
||||
|
||||
return plug, err
|
||||
})
|
||||
if plugin == nil || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return plugin.(Analyzer), nil
|
||||
}
|
||||
|
||||
func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (Provider, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.resourcePlugins[pkg]; has {
|
||||
contract.Assert(plug != nil)
|
||||
plugin, err := host.loadPlugin(func() (interface{}, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.resourcePlugins[pkg]; has {
|
||||
contract.Assert(plug != nil)
|
||||
|
||||
// Make sure the versions match.
|
||||
// TODO: support loading multiple plugin versions side-by-side.
|
||||
if version != nil {
|
||||
if plug.Info.Version == nil {
|
||||
return nil,
|
||||
errors.Errorf("resource plugin version %s requested, but an unknown version was found",
|
||||
version.String())
|
||||
} else if !version.EQ(*plug.Info.Version) {
|
||||
return nil,
|
||||
errors.Errorf("resource plugin version %s requested, but version %s was found",
|
||||
version.String(), plug.Info.Version.String())
|
||||
}
|
||||
}
|
||||
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
|
||||
// If not, try to load and bind to a plugin.
|
||||
plug, err := NewProvider(host, host.ctx, pkg, version)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
}
|
||||
|
||||
// Warn if the plugin version was not what we expected
|
||||
if version != nil {
|
||||
if info.Version == nil || !version.EQ(*info.Version) {
|
||||
var v string
|
||||
if info.Version != nil {
|
||||
v = info.Version.String()
|
||||
// Make sure the versions match.
|
||||
// TODO: support loading multiple plugin versions side-by-side.
|
||||
if version != nil {
|
||||
if plug.Info.Version == nil {
|
||||
return nil,
|
||||
errors.Errorf("resource plugin version %s requested, but an unknown version was found",
|
||||
version.String())
|
||||
} else if !version.EQ(*plug.Info.Version) {
|
||||
return nil,
|
||||
errors.Errorf("resource plugin version %s requested, but version %s was found",
|
||||
version.String(), plug.Info.Version.String())
|
||||
}
|
||||
host.ctx.Diag.Warningf(
|
||||
diag.Message("resource plugin %s mis-reported its own version, expected %s got %s"),
|
||||
info.Name, version.String(), v)
|
||||
}
|
||||
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
|
||||
// Configure the provider. If no configuration source is present, assume no configuration. We do this here
|
||||
// because resource providers must be configured exactly once before any method besides Configure is called.
|
||||
providerConfig := make(map[config.Key]string)
|
||||
if host.config != nil {
|
||||
providerConfig, err = host.config.GetPackageConfig(pkg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to fetch configuration for pkg '%v' resource provider", pkg)
|
||||
// If not, try to load and bind to a plugin.
|
||||
plug, err := NewProvider(host, host.ctx, pkg, version)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
}
|
||||
}
|
||||
if err = plug.Configure(providerConfig); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to configure pkg '%v' resource provider", pkg)
|
||||
|
||||
// Warn if the plugin version was not what we expected
|
||||
if version != nil {
|
||||
if info.Version == nil || !version.EQ(*info.Version) {
|
||||
var v string
|
||||
if info.Version != nil {
|
||||
v = info.Version.String()
|
||||
}
|
||||
host.ctx.Diag.Warningf(
|
||||
diag.Message("resource plugin %s mis-reported its own version, expected %s got %s"),
|
||||
info.Name, version.String(), v)
|
||||
}
|
||||
}
|
||||
|
||||
// Configure the provider. If no configuration source is present, assume no configuration. We do this here
|
||||
// because resource providers must be configured exactly once before any method besides Configure is called.
|
||||
providerConfig := make(map[config.Key]string)
|
||||
if host.config != nil {
|
||||
providerConfig, err = host.config.GetPackageConfig(pkg)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to fetch configuration for pkg '%v' resource provider", pkg)
|
||||
}
|
||||
}
|
||||
if err = plug.Configure(providerConfig); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to configure pkg '%v' resource provider", pkg)
|
||||
}
|
||||
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.resourcePlugins[pkg] = &resourcePlugin{Plugin: plug, Info: info}
|
||||
}
|
||||
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.resourcePlugins[pkg] = &resourcePlugin{Plugin: plug, Info: info}
|
||||
return plug, err
|
||||
})
|
||||
if plugin == nil || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return plug, err
|
||||
return plugin.(Provider), nil
|
||||
}
|
||||
|
||||
func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.languagePlugins[runtime]; has {
|
||||
contract.Assert(plug != nil)
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
|
||||
// If not, allocate a new one.
|
||||
plug, err := NewLanguageRuntime(host, host.ctx, runtime)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
plugin, err := host.loadPlugin(func() (interface{}, error) {
|
||||
// First see if we already loaded this plugin.
|
||||
if plug, has := host.languagePlugins[runtime]; has {
|
||||
contract.Assert(plug != nil)
|
||||
return plug.Plugin, nil
|
||||
}
|
||||
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
|
||||
}
|
||||
// If not, allocate a new one.
|
||||
plug, err := NewLanguageRuntime(host, host.ctx, runtime)
|
||||
if err == nil && plug != nil {
|
||||
info, infoerr := plug.GetPluginInfo()
|
||||
if infoerr != nil {
|
||||
return nil, infoerr
|
||||
}
|
||||
|
||||
return plug, err
|
||||
// Memoize the result.
|
||||
host.plugins = append(host.plugins, info)
|
||||
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
|
||||
}
|
||||
|
||||
return plug, err
|
||||
})
|
||||
if plugin == nil || err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return plugin.(LanguageRuntime), nil
|
||||
}
|
||||
|
||||
func (host *defaultHost) ListPlugins() []workspace.PluginInfo {
|
||||
|
@ -312,6 +360,9 @@ func (host *defaultHost) Close() error {
|
|||
host.languagePlugins = make(map[string]*languagePlugin)
|
||||
host.resourcePlugins = make(map[tokens.Package]*resourcePlugin)
|
||||
|
||||
// Shut down the plugin loader.
|
||||
close(host.loadRequests)
|
||||
|
||||
// Finally, shut down the host's gRPC server.
|
||||
return host.server.Cancel()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue