Merge pull request #418 from pulumi/EventChannel
Ensure a plugin's std{out,err} streams are drained in Close().
This commit is contained in:
commit
ea98499548
1 changed files with 18 additions and 4 deletions
|
@ -24,6 +24,9 @@ import (
|
|||
)
|
||||
|
||||
type plugin struct {
|
||||
stdoutDone <-chan bool
|
||||
stderrDone <-chan bool
|
||||
|
||||
Conn *grpc.ClientConn
|
||||
Proc *os.Process
|
||||
Stdin io.WriteCloser
|
||||
|
@ -59,7 +62,7 @@ func newPlugin(ctx *Context, bin string, prefix string, args []string) (*plugin,
|
|||
contract.Assert(plug != nil)
|
||||
|
||||
// For now, we will spawn goroutines that will spew STDOUT/STDERR to the relevant diag streams.
|
||||
runtrace := func(t io.Reader, stderr bool) {
|
||||
runtrace := func(t io.Reader, stderr bool, done chan<- bool) {
|
||||
reader := bufio.NewReader(t)
|
||||
for {
|
||||
line, readerr := reader.ReadString('\n')
|
||||
|
@ -73,10 +76,14 @@ func newPlugin(ctx *Context, bin string, prefix string, args []string) (*plugin,
|
|||
ctx.Diag.Infof(diag.Message("%s"), msg)
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}
|
||||
|
||||
stderrDone, stdoutDone := make(chan bool), make(chan bool)
|
||||
plug.stderrDone, plug.stdoutDone = stderrDone, stdoutDone
|
||||
|
||||
// Set up a tracer on stderr before going any further, since important errors might get communicated this way.
|
||||
go runtrace(plug.Stderr, true)
|
||||
go runtrace(plug.Stderr, true, stderrDone)
|
||||
|
||||
// Now that we have a process, we expect it to write a single line to STDOUT: the port it's listening on. We only
|
||||
// read a byte at a time so that STDOUT contains everything after the first newline.
|
||||
|
@ -107,7 +114,7 @@ func newPlugin(ctx *Context, bin string, prefix string, args []string) (*plugin,
|
|||
}
|
||||
|
||||
// After reading the port number, set up a tracer on stdout just so other output doesn't disappear.
|
||||
go runtrace(plug.Stdout, false)
|
||||
go runtrace(plug.Stdout, false, stdoutDone)
|
||||
|
||||
// Now that we have the port, go ahead and create a gRPC client connection to it.
|
||||
conn, err := grpc.Dial(":"+port, grpc.WithInsecure())
|
||||
|
@ -191,6 +198,13 @@ func execPlugin(bin string, args []string) (*plugin, error) {
|
|||
func (p *plugin) Close() error {
|
||||
closerr := p.Conn.Close()
|
||||
contract.IgnoreError(closerr)
|
||||
|
||||
// IDEA: consider a more graceful termination than just SIGKILL.
|
||||
return p.Proc.Kill()
|
||||
err := p.Proc.Kill()
|
||||
|
||||
// Wait for stdout and stderr to drain
|
||||
<-p.stdoutDone
|
||||
<-p.stderrDone
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue