From a13a83b06712cc5b191490159f2f78eb79f4bf84 Mon Sep 17 00:00:00 2001 From: joeduffy Date: Sun, 3 Sep 2017 12:37:56 -0700 Subject: [PATCH] Pass the monitor address correctly to language plugins --- pkg/resource/deploy/plan_test.go | 6 +++--- pkg/resource/deploy/source_eval.go | 2 +- pkg/resource/plugin/analyzer_plugin.go | 2 +- pkg/resource/plugin/host.go | 20 ++++---------------- pkg/resource/plugin/langruntime_plugin.go | 4 ++-- pkg/resource/plugin/plugin.go | 20 ++++++++++++-------- pkg/resource/plugin/provider_plugin.go | 2 +- 7 files changed, 24 insertions(+), 32 deletions(-) diff --git a/pkg/resource/deploy/plan_test.go b/pkg/resource/deploy/plan_test.go index 3b38f10c9..7029ad400 100644 --- a/pkg/resource/deploy/plan_test.go +++ b/pkg/resource/deploy/plan_test.go @@ -345,7 +345,7 @@ func (g *testSourceGoal) Done(state *resource.State) { type testProviderHost struct { analyzer func(nm tokens.QName) (plugin.Analyzer, error) provider func(pkg tokens.Package) (plugin.Provider, error) - langhost func(runtime string) (plugin.LanguageRuntime, error) + langhost func(runtime string, monitorAddr string) (plugin.LanguageRuntime, error) } func (host *testProviderHost) Close() error { @@ -370,8 +370,8 @@ func (host *testProviderHost) Analyzer(nm tokens.QName) (plugin.Analyzer, error) func (host *testProviderHost) Provider(pkg tokens.Package) (plugin.Provider, error) { return host.provider(pkg) } -func (host *testProviderHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) { - return host.langhost(runtime) +func (host *testProviderHost) LanguageRuntime(runtime string, monitorAddr string) (plugin.LanguageRuntime, error) { + return host.langhost(runtime, monitorAddr) } type testProvider struct { diff --git a/pkg/resource/deploy/source_eval.go b/pkg/resource/deploy/source_eval.go index d3975b939..a904ad2ed 100644 --- a/pkg/resource/deploy/source_eval.go +++ b/pkg/resource/deploy/source_eval.go @@ -73,7 +73,7 @@ func (src *evalSource) Iterate() (SourceIterator, error) { // Next fire up the language plugin. // IDEA: cache these so we reuse the same language plugin instance; if we do this, monitors must be per-run. rt := src.runinfo.Pkg.Runtime - langhost, err := src.plugctx.Host.LanguageRuntime(rt) + langhost, err := src.plugctx.Host.LanguageRuntime(rt, mon.Address()) if err != nil { return nil, errors.Wrapf(err, "failed to launch language host for '%v'", src.runinfo.Pkg.Runtime) } else if langhost == nil { diff --git a/pkg/resource/plugin/analyzer_plugin.go b/pkg/resource/plugin/analyzer_plugin.go index 72871b0d0..6f431b3d1 100644 --- a/pkg/resource/plugin/analyzer_plugin.go +++ b/pkg/resource/plugin/analyzer_plugin.go @@ -28,7 +28,7 @@ type analyzer struct { func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) { // Go ahead and attempt to load the plugin from the PATH. srvexe := AnalyzerPluginPrefix + strings.Replace(string(name), tokens.QNameDelimiter, "_", -1) - plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("analyzer[%v]", name)) + plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("analyzer[%v]", name), []string{host.ServerAddr()}) if err != nil { return nil, err } else if plug == nil { diff --git a/pkg/resource/plugin/host.go b/pkg/resource/plugin/host.go index 20d54162f..aa5ac0977 100644 --- a/pkg/resource/plugin/host.go +++ b/pkg/resource/plugin/host.go @@ -26,7 +26,7 @@ type Host interface { Provider(pkg tokens.Package) (Provider, error) // LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If // an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned. - LanguageRuntime(runtime string) (LanguageRuntime, error) + LanguageRuntime(runtime string, monitorAddr string) (LanguageRuntime, error) // Close reclaims any resources associated with the host. Close() error @@ -38,7 +38,6 @@ func NewDefaultHost(ctx *Context) (Host, error) { ctx: ctx, analyzers: make(map[tokens.QName]Analyzer), providers: make(map[tokens.Package]Provider), - langhosts: make(map[string]LanguageRuntime), } // Fire up a gRPC server to listen for requests. This acts as a RPC interface that plugins can use @@ -56,7 +55,6 @@ type defaultHost struct { ctx *Context // the shared context for this host. analyzers map[tokens.QName]Analyzer // a cache of analyzer plugins and their processes. providers map[tokens.Package]Provider // a cache of provider plugins and their processes. - langhosts map[string]LanguageRuntime // a cache of language runtime plugins and their processes. server *hostServer // the server's RPC machinery. } @@ -98,19 +96,9 @@ func (host *defaultHost) Provider(pkg tokens.Package) (Provider, error) { return plug, err } -func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) { - // First see if we already loaded this plugin. - if plug, has := host.langhosts[runtime]; has { - contract.Assert(plug != nil) - return plug, nil - } - - // If not, try to load and bind to a plugin. - plug, err := NewLanguageRuntime(host, host.ctx, runtime) - if err == nil && plug != nil { - host.langhosts[runtime] = plug // memoize the result. - } - return plug, err +func (host *defaultHost) LanguageRuntime(runtime string, monitorAddr string) (LanguageRuntime, error) { + // Always load a fresh language runtime, since each has a unique resource monitor session. + return NewLanguageRuntime(host.ctx, runtime, monitorAddr) } func (host *defaultHost) Close() error { diff --git a/pkg/resource/plugin/langruntime_plugin.go b/pkg/resource/plugin/langruntime_plugin.go index 7536daa4e..3b06f07c0 100644 --- a/pkg/resource/plugin/langruntime_plugin.go +++ b/pkg/resource/plugin/langruntime_plugin.go @@ -24,10 +24,10 @@ type langhost struct { // NewLanguageRuntime binds to a language's runtime plugin and then creates a gRPC connection to it. If the // plugin could not be found, or an error occurs while creating the child process, an error is returned. -func NewLanguageRuntime(host Host, ctx *Context, runtime string) (LanguageRuntime, error) { +func NewLanguageRuntime(ctx *Context, runtime string, monitorAddr string) (LanguageRuntime, error) { // Go ahead and attempt to load the plugin from the PATH. srvexe := LanguagePluginPrefix + strings.Replace(runtime, tokens.QNameDelimiter, "_", -1) - plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("langhost[%v]", runtime)) + plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("langhost[%v]", runtime), []string{monitorAddr}) if err != nil { return nil, err } else if plug == nil { diff --git a/pkg/resource/plugin/plugin.go b/pkg/resource/plugin/plugin.go index 8194eaf2a..59c6ae9c0 100644 --- a/pkg/resource/plugin/plugin.go +++ b/pkg/resource/plugin/plugin.go @@ -10,6 +10,7 @@ import ( "os/exec" "strconv" + "github.com/golang/glog" "github.com/pkg/errors" "google.golang.org/grpc" @@ -26,9 +27,17 @@ type plugin struct { Stderr io.ReadCloser } -func newPlugin(host Host, ctx *Context, bin string, prefix string) (*plugin, error) { +func newPlugin(ctx *Context, bin string, prefix string, args []string) (*plugin, error) { + if glog.V(9) { + var argstr string + for _, arg := range args { + argstr += " " + arg + } + glog.V(9).Infof("Launching plugin '%v' from '%v' with args '%v'", prefix, bin, argstr) + } + // Try to execute the binary. - plug, err := execPlugin(host, bin) + plug, err := execPlugin(bin, args) if err != nil { // If we failed simply because we couldn't load the binary, return nil rather than an error. if execerr, isexecerr := err.(*exec.Error); isexecerr && execerr.Err == exec.ErrNotFound { @@ -104,13 +113,8 @@ func newPlugin(host Host, ctx *Context, bin string, prefix string) (*plugin, err return plug, nil } -func execPlugin(host Host, bin string) (*plugin, error) { +func execPlugin(bin string, args []string) (*plugin, error) { // Flow the logging information if set. - var args []string - - // Append the argument that tells the plugin the address for the engine. - args = append(args, host.ServerAddr()) - if cmdutil.LogFlow { if cmdutil.LogToStderr { args = append(args, "--logtostderr") diff --git a/pkg/resource/plugin/provider_plugin.go b/pkg/resource/plugin/provider_plugin.go index e1637a46d..3ea5187b2 100644 --- a/pkg/resource/plugin/provider_plugin.go +++ b/pkg/resource/plugin/provider_plugin.go @@ -30,7 +30,7 @@ type provider struct { func NewProvider(host Host, ctx *Context, pkg tokens.Package) (Provider, error) { // Go ahead and attempt to load the plugin from the PATH. srvexe := ProviderPluginPrefix + strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1) - plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("resource[%v]", pkg)) + plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("resource[%v]", pkg), []string{host.ServerAddr()}) if err != nil { return nil, err } else if plug == nil {