pulumi/pkg/util/rpcutil/serve.go
Luke Hoban af5298f4aa
Initial work on tracing support (#521)
Adds OpenTracing in the Pulumi engine and plugin + langhost subprocesses.

We currently create a single root span for any `Enging.plan` operation - which is a single `preview`, `update`, `destroy`, etc.

The only sub-spans we currently create are at gRPC boundaries, both on the client and server sides and on both the langhost and provider plugin interfaces.

We could extend this to include spans for any other semantically meaningful sections of compute inside the engine, though initial examples show we get pretty good granularity of coverage by focusing on the gRPC boundaries.

In the future, this should be easily extensible to HTTP boundaries and to track other bulky I/O like datastore read/writes once we hook up to the PPC and Pulumi Cloud.

We expose a `--trace <endpoint>` option to enable tracing on the CLI, which we will aim to thread through to subprocesses.

We currently support sending tracing data to a Zipkin-compatible endpoint.  This has been validated with both Zipkin and Jaeger UIs.

We do not yet have any tracing inside the TypeScript side of the JS langhost RPC interface.  There is not yet automatic gRPC OpenTracing instrumentation (though it looks like it's in progress now) - so we would need to manually create meaningful spans on that side of the interface.
2017-11-08 17:08:51 -08:00

73 lines
2.6 KiB
Go

// Copyright 2016-2017, Pulumi Corporation. All rights reserved.
package rpcutil
import (
"net"
"strconv"
"strings"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// IsBenignCloseErr returns true if the error is "expected" upon shutdown of the server.
func IsBenignCloseErr(err error) bool {
msg := err.Error()
return strings.HasSuffix(msg, "use of closed network connection") ||
strings.HasSuffix(msg, "grpc: the server has been stopped")
}
// Serve creates a new gRPC server, calls out to the supplied registration functions to bind interfaces, and then
// listens on the supplied TCP port. If the caller wishes for the kernel to choose a free port automatically, pass 0 as
// the port number. The return values are: the chosen port (the same as supplied if non-0), a channel that may
// eventually return an error, and an error, in case something went wrong. The channel is non-nil and waits until
// the server is finished, in the case of a successful launch of the RPC server.
func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error) (int, chan error, error) {
// Listen on a TCP port, but let the kernel choose a free port for us.
lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port))
if err != nil {
return port, nil, errors.Errorf("failed to listen on TCP port ':%v': %v", port, err)
}
// Now new up a gRPC server and register any RPC interfaces the caller wants.
srv := grpc.NewServer(grpc.UnaryInterceptor(OpenTracingServerInterceptor()))
for _, register := range registers {
if err := register(srv); err != nil {
return port, nil, errors.Errorf("failed to register RPC handler: %v", err)
}
}
reflection.Register(srv) // enable reflection.
// If the port was 0, look up what port the kernel chosen, by accessing the underlying TCP listener/address.
if port == 0 {
tcpl := lis.(*net.TCPListener)
tcpa := tcpl.Addr().(*net.TCPAddr)
port = tcpa.Port
}
// If the caller provided a cancellation channel, start a goroutine that will gracefully terminate the gRPC server when
// that channel is closed or receives a `true` value.
if cancel != nil {
go func() {
for v, ok := <-cancel; !v && ok; v, ok = <-cancel {
}
srv.GracefulStop()
}()
}
// Finally, serve; this returns only once the server shuts down (e.g., due to a signal).
done := make(chan error)
go func() {
if err := srv.Serve(lis); err != nil && !IsBenignCloseErr(err) {
done <- errors.Errorf("stopped serving: %v", err)
} else {
done <- nil // send a signal so caller knows we're done, even though it's nil.
}
close(done)
}()
return port, done, nil
}