Pass the monitor address correctly to language plugins

This commit is contained in:
joeduffy 2017-09-03 12:37:56 -07:00
parent 311550b5e9
commit a13a83b067
7 changed files with 24 additions and 32 deletions

View file

@ -345,7 +345,7 @@ func (g *testSourceGoal) Done(state *resource.State) {
type testProviderHost struct { type testProviderHost struct {
analyzer func(nm tokens.QName) (plugin.Analyzer, error) analyzer func(nm tokens.QName) (plugin.Analyzer, error)
provider func(pkg tokens.Package) (plugin.Provider, error) provider func(pkg tokens.Package) (plugin.Provider, error)
langhost func(runtime string) (plugin.LanguageRuntime, error) langhost func(runtime string, monitorAddr string) (plugin.LanguageRuntime, error)
} }
func (host *testProviderHost) Close() error { func (host *testProviderHost) Close() error {
@ -370,8 +370,8 @@ func (host *testProviderHost) Analyzer(nm tokens.QName) (plugin.Analyzer, error)
func (host *testProviderHost) Provider(pkg tokens.Package) (plugin.Provider, error) { func (host *testProviderHost) Provider(pkg tokens.Package) (plugin.Provider, error) {
return host.provider(pkg) return host.provider(pkg)
} }
func (host *testProviderHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) { func (host *testProviderHost) LanguageRuntime(runtime string, monitorAddr string) (plugin.LanguageRuntime, error) {
return host.langhost(runtime) return host.langhost(runtime, monitorAddr)
} }
type testProvider struct { type testProvider struct {

View file

@ -73,7 +73,7 @@ func (src *evalSource) Iterate() (SourceIterator, error) {
// Next fire up the language plugin. // Next fire up the language plugin.
// IDEA: cache these so we reuse the same language plugin instance; if we do this, monitors must be per-run. // IDEA: cache these so we reuse the same language plugin instance; if we do this, monitors must be per-run.
rt := src.runinfo.Pkg.Runtime rt := src.runinfo.Pkg.Runtime
langhost, err := src.plugctx.Host.LanguageRuntime(rt) langhost, err := src.plugctx.Host.LanguageRuntime(rt, mon.Address())
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to launch language host for '%v'", src.runinfo.Pkg.Runtime) return nil, errors.Wrapf(err, "failed to launch language host for '%v'", src.runinfo.Pkg.Runtime)
} else if langhost == nil { } else if langhost == nil {

View file

@ -28,7 +28,7 @@ type analyzer struct {
func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) { func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) {
// Go ahead and attempt to load the plugin from the PATH. // Go ahead and attempt to load the plugin from the PATH.
srvexe := AnalyzerPluginPrefix + strings.Replace(string(name), tokens.QNameDelimiter, "_", -1) srvexe := AnalyzerPluginPrefix + strings.Replace(string(name), tokens.QNameDelimiter, "_", -1)
plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("analyzer[%v]", name)) plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("analyzer[%v]", name), []string{host.ServerAddr()})
if err != nil { if err != nil {
return nil, err return nil, err
} else if plug == nil { } else if plug == nil {

View file

@ -26,7 +26,7 @@ type Host interface {
Provider(pkg tokens.Package) (Provider, error) Provider(pkg tokens.Package) (Provider, error)
// LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If // LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If
// an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned. // an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned.
LanguageRuntime(runtime string) (LanguageRuntime, error) LanguageRuntime(runtime string, monitorAddr string) (LanguageRuntime, error)
// Close reclaims any resources associated with the host. // Close reclaims any resources associated with the host.
Close() error Close() error
@ -38,7 +38,6 @@ func NewDefaultHost(ctx *Context) (Host, error) {
ctx: ctx, ctx: ctx,
analyzers: make(map[tokens.QName]Analyzer), analyzers: make(map[tokens.QName]Analyzer),
providers: make(map[tokens.Package]Provider), providers: make(map[tokens.Package]Provider),
langhosts: make(map[string]LanguageRuntime),
} }
// Fire up a gRPC server to listen for requests. This acts as a RPC interface that plugins can use // Fire up a gRPC server to listen for requests. This acts as a RPC interface that plugins can use
@ -56,7 +55,6 @@ type defaultHost struct {
ctx *Context // the shared context for this host. ctx *Context // the shared context for this host.
analyzers map[tokens.QName]Analyzer // a cache of analyzer plugins and their processes. analyzers map[tokens.QName]Analyzer // a cache of analyzer plugins and their processes.
providers map[tokens.Package]Provider // a cache of provider plugins and their processes. providers map[tokens.Package]Provider // a cache of provider plugins and their processes.
langhosts map[string]LanguageRuntime // a cache of language runtime plugins and their processes.
server *hostServer // the server's RPC machinery. server *hostServer // the server's RPC machinery.
} }
@ -98,19 +96,9 @@ func (host *defaultHost) Provider(pkg tokens.Package) (Provider, error) {
return plug, err return plug, err
} }
func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) { func (host *defaultHost) LanguageRuntime(runtime string, monitorAddr string) (LanguageRuntime, error) {
// First see if we already loaded this plugin. // Always load a fresh language runtime, since each has a unique resource monitor session.
if plug, has := host.langhosts[runtime]; has { return NewLanguageRuntime(host.ctx, runtime, monitorAddr)
contract.Assert(plug != nil)
return plug, nil
}
// If not, try to load and bind to a plugin.
plug, err := NewLanguageRuntime(host, host.ctx, runtime)
if err == nil && plug != nil {
host.langhosts[runtime] = plug // memoize the result.
}
return plug, err
} }
func (host *defaultHost) Close() error { func (host *defaultHost) Close() error {

View file

@ -24,10 +24,10 @@ type langhost struct {
// NewLanguageRuntime binds to a language's runtime plugin and then creates a gRPC connection to it. If the // NewLanguageRuntime binds to a language's runtime plugin and then creates a gRPC connection to it. If the
// plugin could not be found, or an error occurs while creating the child process, an error is returned. // plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewLanguageRuntime(host Host, ctx *Context, runtime string) (LanguageRuntime, error) { func NewLanguageRuntime(ctx *Context, runtime string, monitorAddr string) (LanguageRuntime, error) {
// Go ahead and attempt to load the plugin from the PATH. // Go ahead and attempt to load the plugin from the PATH.
srvexe := LanguagePluginPrefix + strings.Replace(runtime, tokens.QNameDelimiter, "_", -1) srvexe := LanguagePluginPrefix + strings.Replace(runtime, tokens.QNameDelimiter, "_", -1)
plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("langhost[%v]", runtime)) plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("langhost[%v]", runtime), []string{monitorAddr})
if err != nil { if err != nil {
return nil, err return nil, err
} else if plug == nil { } else if plug == nil {

View file

@ -10,6 +10,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"github.com/golang/glog"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -26,9 +27,17 @@ type plugin struct {
Stderr io.ReadCloser Stderr io.ReadCloser
} }
func newPlugin(host Host, ctx *Context, bin string, prefix string) (*plugin, error) { func newPlugin(ctx *Context, bin string, prefix string, args []string) (*plugin, error) {
if glog.V(9) {
var argstr string
for _, arg := range args {
argstr += " " + arg
}
glog.V(9).Infof("Launching plugin '%v' from '%v' with args '%v'", prefix, bin, argstr)
}
// Try to execute the binary. // Try to execute the binary.
plug, err := execPlugin(host, bin) plug, err := execPlugin(bin, args)
if err != nil { if err != nil {
// If we failed simply because we couldn't load the binary, return nil rather than an error. // If we failed simply because we couldn't load the binary, return nil rather than an error.
if execerr, isexecerr := err.(*exec.Error); isexecerr && execerr.Err == exec.ErrNotFound { if execerr, isexecerr := err.(*exec.Error); isexecerr && execerr.Err == exec.ErrNotFound {
@ -104,13 +113,8 @@ func newPlugin(host Host, ctx *Context, bin string, prefix string) (*plugin, err
return plug, nil return plug, nil
} }
func execPlugin(host Host, bin string) (*plugin, error) { func execPlugin(bin string, args []string) (*plugin, error) {
// Flow the logging information if set. // Flow the logging information if set.
var args []string
// Append the argument that tells the plugin the address for the engine.
args = append(args, host.ServerAddr())
if cmdutil.LogFlow { if cmdutil.LogFlow {
if cmdutil.LogToStderr { if cmdutil.LogToStderr {
args = append(args, "--logtostderr") args = append(args, "--logtostderr")

View file

@ -30,7 +30,7 @@ type provider struct {
func NewProvider(host Host, ctx *Context, pkg tokens.Package) (Provider, error) { func NewProvider(host Host, ctx *Context, pkg tokens.Package) (Provider, error) {
// Go ahead and attempt to load the plugin from the PATH. // Go ahead and attempt to load the plugin from the PATH.
srvexe := ProviderPluginPrefix + strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1) srvexe := ProviderPluginPrefix + strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1)
plug, err := newPlugin(host, ctx, srvexe, fmt.Sprintf("resource[%v]", pkg)) plug, err := newPlugin(ctx, srvexe, fmt.Sprintf("resource[%v]", pkg), []string{host.ServerAddr()})
if err != nil { if err != nil {
return nil, err return nil, err
} else if plug == nil { } else if plug == nil {