Improve tracing support. (#3238)
* Fix some tracing issues. - Add endpoints for `startUpdate` and `postEngineEventsBatch` so that spans for these invocations have proper names - Inject a tracing span when walking a plan so that resource operations are properly parented - When handling gRPC calls, inject a tracing span into the call's metadata if no span is already present so that resource monitor and engine spans are properly parented - Do not trace client gRPC invocations of the empty method so that these calls (which are used to determine server availability) do not muddy the trace. Note that I tried parenting these spans appropriately, but doing so broke the trace entirely. With these changes, the only unparented span in a typical Pulumi invocation is a single call to `getUser`. This span is unparented because that call does not have a context available. Plumbing a context into that particular call is surprisingly tricky, as it is often called by other context-less functions. * Make tracing support more flexible. - Add support for writing trace data to a local file using Appdash - Add support for viewing Appdash traces via the CLI
This commit is contained in:
parent
48a00b507d
commit
82204230e1
|
@ -3,6 +3,9 @@ CHANGELOG
|
|||
|
||||
## HEAD (Unreleased)
|
||||
|
||||
- Support emitting high-level execution trace data to a file and add a debug-only command to view trace data.
|
||||
[#3238](https://github.com/pulumi/pulumi/pull/3238)
|
||||
|
||||
## 1.1.0 (2019-09-11)
|
||||
|
||||
- Fix a bug that caused the Python runtime to ignore unhandled exceptions and erroneously report that a Pulumi program executed successfully.
|
||||
|
|
|
@ -144,7 +144,7 @@ func NewPulumiCmd() *cobra.Command {
|
|||
cmd.PersistentFlags().BoolVar(&cmdutil.DisableInteractive, "non-interactive", false,
|
||||
"Disable interactive mode for all commands")
|
||||
cmd.PersistentFlags().StringVar(&tracing, "tracing", "",
|
||||
"Emit tracing to a Zipkin-compatible tracing endpoint")
|
||||
"Emit tracing to the specified endpoint. Use the `file:` scheme to write tracing data to a local file")
|
||||
cmd.PersistentFlags().StringVar(&profiling, "profiling", "",
|
||||
"Emit CPU and memory profiles and an execution trace to '[filename].[pid].{cpu,mem,trace}', respectively")
|
||||
cmd.PersistentFlags().IntVarP(&verbose, "verbose", "v", 0,
|
||||
|
@ -188,6 +188,8 @@ func NewPulumiCmd() *cobra.Command {
|
|||
cmd.AddCommand(newQueryCmd())
|
||||
// - Policy Management Commands:
|
||||
cmd.AddCommand(newPolicyCmd())
|
||||
// - Diagnostic Commands:
|
||||
cmd.AddCommand(newViewTraceCmd())
|
||||
}
|
||||
|
||||
return cmd
|
||||
|
|
82
cmd/view-trace.go
Normal file
82
cmd/view-trace.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
// Copyright 2016-2018, Pulumi Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"sourcegraph.com/sourcegraph/appdash"
|
||||
"sourcegraph.com/sourcegraph/appdash/traceapp"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/util/cmdutil"
|
||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||
)
|
||||
|
||||
func readTrace(path string, store io.ReaderFrom) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer contract.IgnoreClose(f)
|
||||
_, err = store.ReadFrom(f)
|
||||
return err
|
||||
}
|
||||
|
||||
func newViewTraceCmd() *cobra.Command {
|
||||
var port int
|
||||
var cmd = &cobra.Command{
|
||||
Use: "view-trace [trace-file]",
|
||||
Short: "Display a trace from the Pulumi CLI",
|
||||
Long: "Display a trace from the Pulumi CLI.\n" +
|
||||
"\n" +
|
||||
"This command is used to display execution traces collected by a prior\n" +
|
||||
"invocation of the Pulumi CLI.\n" +
|
||||
"\n" +
|
||||
"This command loads trace data from the indicated file and starts a\n" +
|
||||
"webserver to display the trace. By default, this server will listen\n" +
|
||||
"port 8008; the --port flag can be used to change this if necessary.",
|
||||
Args: cmdutil.ExactArgs(1),
|
||||
Run: cmdutil.RunFunc(func(cmd *cobra.Command, args []string) error {
|
||||
url, err := url.Parse(fmt.Sprintf("http://localhost:%d", port))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
store := appdash.NewMemoryStore()
|
||||
if err := readTrace(args[0], store); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app, err := traceapp.New(nil, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app.Store, app.Queryer = store, store
|
||||
|
||||
fmt.Printf("Displaying trace at %v\n", url)
|
||||
return http.ListenAndServe(fmt.Sprintf(":%d", port), app)
|
||||
}),
|
||||
}
|
||||
|
||||
cmd.PersistentFlags().IntVar(&port, "port", 8008,
|
||||
"the port the trace viewer will listen on")
|
||||
|
||||
return cmd
|
||||
}
|
5
go.mod
5
go.mod
|
@ -31,6 +31,7 @@ require (
|
|||
github.com/nbutton23/zxcvbn-go v0.0.0-20171102151520-eafdab6b0663
|
||||
github.com/onsi/ginkgo v1.7.0 // indirect
|
||||
github.com/onsi/gomega v1.4.3 // indirect
|
||||
github.com/opentracing/basictracer-go v1.0.0 // indirect
|
||||
github.com/opentracing/opentracing-go v1.0.2
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/reconquest/loreley v0.0.0-20160708080500-2ab6b7470a54
|
||||
|
@ -38,6 +39,8 @@ require (
|
|||
github.com/sabhiram/go-gitignore v0.0.0-20180611051255-d3107576ba94 // indirect
|
||||
github.com/satori/go.uuid v1.2.0
|
||||
github.com/sergi/go-diff v1.0.0
|
||||
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
|
||||
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
|
||||
github.com/sirupsen/logrus v1.3.0 // indirect
|
||||
github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c
|
||||
github.com/spf13/cast v1.2.0
|
||||
|
@ -61,4 +64,6 @@ require (
|
|||
gopkg.in/src-d/go-git-fixtures.v3 v3.4.0 // indirect
|
||||
gopkg.in/src-d/go-git.v4 v4.8.1
|
||||
gopkg.in/yaml.v2 v2.2.2
|
||||
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
|
||||
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
|
||||
)
|
||||
|
|
11
go.sum
11
go.sum
|
@ -129,6 +129,7 @@ github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
|
|||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
|
||||
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
|
@ -320,6 +321,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
|
|||
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
|
||||
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
|
||||
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
|
||||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
|
@ -367,6 +370,10 @@ github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7/go.mod h1:5b4v6he4
|
|||
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
|
||||
github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk=
|
||||
github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ=
|
||||
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
|
||||
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
|
||||
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
|
||||
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
|
||||
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
|
||||
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
|
@ -559,6 +566,10 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
pack.ag/amqp v0.8.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM=
|
||||
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
|
||||
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 h1:e1sMhtVq9AfcEy8AXNb8eSg6gbzfdpYhoNqnPJa+GzI=
|
||||
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k=
|
||||
sourcegraph.com/sourcegraph/go-diff v0.0.0-20171119081133-3f415a150aec h1:wAAdENPXC7bE1oxY4VqSDdhaA+XQ8TgQHsZMMnrXjEk=
|
||||
sourcegraph.com/sourcegraph/go-diff v0.0.0-20171119081133-3f415a150aec/go.mod h1:R09mWeb9JcPbO+A3cYDc11xjz0wp6r9+KnqdqROAoRU=
|
||||
sourcegraph.com/sqs/pbtypes v0.0.0-20160107090929-4d1b9dc7ffc3 h1:hXy8YsgVLDz5mlngKhNHQhAsAGrSp3dlXZN4b0/4UUI=
|
||||
|
|
|
@ -102,9 +102,11 @@ func init() {
|
|||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/update", "createUpdate")
|
||||
|
||||
addEndpoint("GET", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}", "getUpdateStatus")
|
||||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}", "startUpdate")
|
||||
addEndpoint("PATCH", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}/checkpoint", "patchCheckpoint")
|
||||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}/complete", "completeUpdate")
|
||||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}/events", "postEngineEvent")
|
||||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}/events/batch", "postEngineEventBatch")
|
||||
addEndpoint("POST", "/api/stacks/{orgName}/{projectName}/{stackName}/{updateKind}/{updateID}/renew_lease", "renewLease")
|
||||
|
||||
// APIs for managing `PolicyPack`s.
|
||||
|
|
|
@ -168,6 +168,11 @@ func (planResult *planResult) Chdir() (func(), error) {
|
|||
func (planResult *planResult) Walk(cancelCtx *Context, events deploy.Events, preview bool) result.Result {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
||||
// Inject our opentracing span into the context.
|
||||
if planResult.Ctx.TracingSpan != nil {
|
||||
ctx = opentracing.ContextWithSpan(ctx, planResult.Ctx.TracingSpan)
|
||||
}
|
||||
|
||||
done := make(chan bool)
|
||||
var walkResult result.Result
|
||||
go func() {
|
||||
|
|
|
@ -28,12 +28,13 @@ import (
|
|||
)
|
||||
|
||||
type QueryOptions struct {
|
||||
Events eventEmitter // the channel to write events from the engine to.
|
||||
Diag diag.Sink // the sink to use for diag'ing.
|
||||
StatusDiag diag.Sink // the sink to use for diag'ing status messages.
|
||||
host plugin.Host // the plugin host to use for this query.
|
||||
pwd, main string
|
||||
plugctx *plugin.Context
|
||||
Events eventEmitter // the channel to write events from the engine to.
|
||||
Diag diag.Sink // the sink to use for diag'ing.
|
||||
StatusDiag diag.Sink // the sink to use for diag'ing status messages.
|
||||
host plugin.Host // the plugin host to use for this query.
|
||||
pwd, main string
|
||||
plugctx *plugin.Context
|
||||
tracingSpan opentracing.Span
|
||||
}
|
||||
|
||||
func Query(ctx *Context, u UpdateInfo, opts UpdateOptions) result.Result {
|
||||
|
@ -76,17 +77,18 @@ func Query(ctx *Context, u UpdateInfo, opts UpdateOptions) result.Result {
|
|||
}
|
||||
|
||||
return query(ctx, u, QueryOptions{
|
||||
Events: emitter,
|
||||
Diag: diag,
|
||||
StatusDiag: statusDiag,
|
||||
host: opts.host,
|
||||
pwd: pwd,
|
||||
main: main,
|
||||
plugctx: plugctx,
|
||||
Events: emitter,
|
||||
Diag: diag,
|
||||
StatusDiag: statusDiag,
|
||||
host: opts.host,
|
||||
pwd: pwd,
|
||||
main: main,
|
||||
plugctx: plugctx,
|
||||
tracingSpan: tracingSpan,
|
||||
})
|
||||
}
|
||||
|
||||
func newQuerySource(cancel context.Context, client deploy.BackendClient, u UpdateInfo,
|
||||
func newQuerySource(ctx context.Context, client deploy.BackendClient, u UpdateInfo,
|
||||
opts QueryOptions) (deploy.QuerySource, error) {
|
||||
|
||||
allPlugins, _, err := installPlugins(u.GetProject(), opts.pwd,
|
||||
|
@ -104,9 +106,13 @@ func newQuerySource(cancel context.Context, client deploy.BackendClient, u Updat
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if opts.tracingSpan != nil {
|
||||
ctx = opentracing.ContextWithSpan(ctx, opts.tracingSpan)
|
||||
}
|
||||
|
||||
// If that succeeded, create a new source that will perform interpretation of the compiled program.
|
||||
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
|
||||
return deploy.NewQuerySource(cancel, opts.plugctx, client, &deploy.EvalRunInfo{
|
||||
return deploy.NewQuerySource(ctx, opts.plugctx, client, &deploy.EvalRunInfo{
|
||||
Proj: u.GetProject(),
|
||||
Pwd: opts.pwd,
|
||||
Program: opts.main,
|
||||
|
|
|
@ -17,12 +17,14 @@ package deploy
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver"
|
||||
pbempty "github.com/golang/protobuf/ptypes/empty"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"time"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/resource"
|
||||
"github.com/pulumi/pulumi/pkg/resource/deploy/providers"
|
||||
|
@ -87,13 +89,13 @@ func (src *evalSource) Info() interface{} { return src.runinfo }
|
|||
func (src *evalSource) Iterate(
|
||||
ctx context.Context, opts Options, providers ProviderSource) (SourceIterator, result.Result) {
|
||||
|
||||
contract.Ignore(ctx) // TODO[pulumi/pulumi#1714]
|
||||
tracingSpan := opentracing.SpanFromContext(ctx)
|
||||
|
||||
// First, fire up a resource monitor that will watch for and record resource creation.
|
||||
regChan := make(chan *registerResourceEvent)
|
||||
regOutChan := make(chan *registerResourceOutputsEvent)
|
||||
regReadChan := make(chan *readResourceEvent)
|
||||
mon, err := newResourceMonitor(src, providers, regChan, regOutChan, regReadChan)
|
||||
mon, err := newResourceMonitor(src, providers, regChan, regOutChan, regReadChan, tracingSpan)
|
||||
if err != nil {
|
||||
return nil, result.FromError(errors.Wrap(err, "failed to start resource monitor"))
|
||||
}
|
||||
|
@ -403,7 +405,8 @@ var _ SourceResourceMonitor = (*resmon)(nil)
|
|||
|
||||
// newResourceMonitor creates a new resource monitor RPC server.
|
||||
func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *registerResourceEvent,
|
||||
regOutChan chan *registerResourceOutputsEvent, regReadChan chan *readResourceEvent) (*resmon, error) {
|
||||
regOutChan chan *registerResourceOutputsEvent, regReadChan chan *readResourceEvent,
|
||||
tracingSpan opentracing.Span) (*resmon, error) {
|
||||
|
||||
// Create our cancellation channel.
|
||||
cancel := make(chan bool)
|
||||
|
@ -434,7 +437,7 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
|
|||
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, tracingSpan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"math"
|
||||
|
||||
pbempty "github.com/golang/protobuf/ptypes/empty"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -39,7 +40,7 @@ type QuerySource interface {
|
|||
|
||||
// NewQuerySource creates a `QuerySource` for some target runtime environment specified by
|
||||
// `runinfo`, and supported by language plugins provided in `plugctx`.
|
||||
func NewQuerySource(cancel context.Context, plugctx *plugin.Context, client BackendClient,
|
||||
func NewQuerySource(ctx context.Context, plugctx *plugin.Context, client BackendClient,
|
||||
runinfo *EvalRunInfo) (QuerySource, error) {
|
||||
|
||||
// Create a new builtin provider. This provider implements features such as `getStack`.
|
||||
|
@ -50,7 +51,7 @@ func NewQuerySource(cancel context.Context, plugctx *plugin.Context, client Back
|
|||
//
|
||||
// NOTE: Using the queryResourceMonitor here is *VERY* important, as its job is to disallow
|
||||
// resource operations in query mode!
|
||||
mon, err := newQueryResourceMonitor(builtins)
|
||||
mon, err := newQueryResourceMonitor(builtins, opentracing.SpanFromContext(ctx))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start resource monitor")
|
||||
}
|
||||
|
@ -62,7 +63,7 @@ func NewQuerySource(cancel context.Context, plugctx *plugin.Context, client Back
|
|||
runinfo: runinfo,
|
||||
runLangPlugin: runLangPlugin,
|
||||
finChan: make(chan result.Result),
|
||||
cancel: cancel,
|
||||
cancel: ctx,
|
||||
}
|
||||
|
||||
// Now invoke Run in a goroutine. All subsequent resource creation events will come in over the gRPC channel,
|
||||
|
@ -167,7 +168,7 @@ func runLangPlugin(src *querySource) result.Result {
|
|||
|
||||
// newQueryResourceMonitor creates a new resource monitor RPC server intended to be used in Pulumi's
|
||||
// "query mode".
|
||||
func newQueryResourceMonitor(builtins *builtinProvider) (*queryResmon, error) {
|
||||
func newQueryResourceMonitor(builtins *builtinProvider, tracingSpan opentracing.Span) (*queryResmon, error) {
|
||||
|
||||
// Create our cancellation channel.
|
||||
cancel := make(chan bool)
|
||||
|
@ -184,7 +185,7 @@ func newQueryResourceMonitor(builtins *builtinProvider) (*queryResmon, error) {
|
|||
pulumirpc.RegisterResourceMonitorServer(srv, queryResmon)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, tracingSpan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ func newHostServer(host Host, ctx *Context) (*hostServer, error) {
|
|||
lumirpc.RegisterEngineServer(srv, engine)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, ctx.tracingSpan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -221,8 +221,8 @@ func execPlugin(bin string, pluginArgs []string, pwd string) (*plugin, error) {
|
|||
args = append(args, "-v="+strconv.Itoa(logging.Verbose))
|
||||
}
|
||||
}
|
||||
// Always flow tracing settings.
|
||||
if cmdutil.TracingEndpoint != "" {
|
||||
// Flow tracing settings if we are using a remote collector.
|
||||
if cmdutil.TracingEndpoint != "" && !cmdutil.TracingToFile {
|
||||
args = append(args, "--tracing", cmdutil.TracingEndpoint)
|
||||
}
|
||||
args = append(args, pluginArgs...)
|
||||
|
|
|
@ -61,7 +61,7 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe
|
|||
pulumirpc.RegisterResourceProviderServer(srv, prov)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return errors.Errorf("fatal: %v", err)
|
||||
}
|
||||
|
|
|
@ -17,19 +17,38 @@ package cmdutil
|
|||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||
jaeger "github.com/uber/jaeger-client-go"
|
||||
"github.com/uber/jaeger-client-go/transport/zipkin"
|
||||
"sourcegraph.com/sourcegraph/appdash"
|
||||
appdash_opentracing "sourcegraph.com/sourcegraph/appdash/opentracing"
|
||||
)
|
||||
|
||||
// TracingEndpoint is the Zipkin-compatible tracing endpoint where tracing data will be sent.
|
||||
var TracingEndpoint string
|
||||
var TracingToFile bool
|
||||
var TracingRootSpan opentracing.Span
|
||||
|
||||
var traceCloser io.Closer
|
||||
|
||||
type localStore struct {
|
||||
path string
|
||||
store *appdash.MemoryStore
|
||||
}
|
||||
|
||||
func (s *localStore) Close() error {
|
||||
f, err := os.Create(s.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer contract.IgnoreClose(f)
|
||||
return s.store.Write(f)
|
||||
}
|
||||
|
||||
func IsTracingEnabled() bool {
|
||||
return TracingEndpoint != ""
|
||||
}
|
||||
|
@ -44,25 +63,58 @@ func InitTracing(name, rootSpanName, tracingEndpoint string) {
|
|||
// Store the tracing endpoint
|
||||
TracingEndpoint = tracingEndpoint
|
||||
|
||||
// Jaeger tracer can be initialized with a transport that will
|
||||
// report tracing Spans to a Zipkin backend
|
||||
transport, err := zipkin.NewHTTPTransport(
|
||||
tracingEndpoint,
|
||||
zipkin.HTTPBatchSize(1),
|
||||
zipkin.HTTPLogger(jaeger.StdLogger),
|
||||
)
|
||||
endpointURL, err := url.Parse(tracingEndpoint)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot initialize HTTP transport: %v", err)
|
||||
log.Fatalf("invalid tracing endpoint: %v", err)
|
||||
}
|
||||
|
||||
// create Jaeger tracer
|
||||
tracer, closer := jaeger.NewTracer(
|
||||
name,
|
||||
jaeger.NewConstSampler(true), // sample all traces
|
||||
jaeger.NewRemoteReporter(transport))
|
||||
var tracer opentracing.Tracer
|
||||
switch {
|
||||
case endpointURL.Scheme == "file":
|
||||
// If the endpoint is a file:// URL, use a local tracer.
|
||||
TracingToFile = true
|
||||
|
||||
// Store the closer so that we can flush the Jaeger span cache on process exit
|
||||
traceCloser = closer
|
||||
path := endpointURL.Path
|
||||
if path == "" {
|
||||
path = endpointURL.Opaque
|
||||
}
|
||||
if path == "" {
|
||||
log.Fatalf("invalid tracing endpoint: %v", err)
|
||||
}
|
||||
|
||||
store := &localStore{
|
||||
path: path,
|
||||
store: appdash.NewMemoryStore(),
|
||||
}
|
||||
traceCloser = store
|
||||
|
||||
collector := appdash.NewLocalCollector(store.store)
|
||||
tracer = appdash_opentracing.NewTracer(collector)
|
||||
case endpointURL.Scheme == "tcp":
|
||||
// If the endpoint scheme is tcp, use an Appdash endpoint.
|
||||
collector := appdash.NewRemoteCollector(tracingEndpoint)
|
||||
traceCloser = collector
|
||||
tracer = appdash_opentracing.NewTracer(collector)
|
||||
default:
|
||||
// Jaeger tracer can be initialized with a transport that will
|
||||
// report tracing Spans to a Zipkin backend
|
||||
transport, err := zipkin.NewHTTPTransport(
|
||||
tracingEndpoint,
|
||||
zipkin.HTTPBatchSize(1),
|
||||
zipkin.HTTPLogger(jaeger.StdLogger),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot initialize HTTP transport: %v", err)
|
||||
}
|
||||
|
||||
// create Jaeger tracer
|
||||
t, closer := jaeger.NewTracer(
|
||||
name,
|
||||
jaeger.NewConstSampler(true), // sample all traces
|
||||
jaeger.NewRemoteReporter(transport))
|
||||
|
||||
tracer, traceCloser = t, closer
|
||||
}
|
||||
|
||||
// Set the ambient tracer
|
||||
opentracing.SetGlobalTracer(tracer)
|
||||
|
|
|
@ -15,20 +15,72 @@
|
|||
package rpcutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// metadataReaderWriter satisfies both the opentracing.TextMapReader and
|
||||
// opentracing.TextMapWriter interfaces.
|
||||
type metadataReaderWriter struct {
|
||||
metadata.MD
|
||||
}
|
||||
|
||||
func (w metadataReaderWriter) Set(key, val string) {
|
||||
// The GRPC HPACK implementation rejects any uppercase keys here.
|
||||
//
|
||||
// As such, since the HTTP_HEADERS format is case-insensitive anyway, we
|
||||
// blindly lowercase the key (which is guaranteed to work in the
|
||||
// Inject/Extract sense per the OpenTracing spec).
|
||||
key = strings.ToLower(key)
|
||||
w.MD[key] = append(w.MD[key], val)
|
||||
}
|
||||
|
||||
func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error {
|
||||
for k, vals := range w.MD {
|
||||
for _, v := range vals {
|
||||
if err := handler(k, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OpenTracingServerInterceptor provides a default gRPC server interceptor for emitting tracing to the global
|
||||
// OpenTracing tracer.
|
||||
func OpenTracingServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return otgrpc.OpenTracingServerInterceptor(
|
||||
func OpenTracingServerInterceptor(parentSpan opentracing.Span) grpc.UnaryServerInterceptor {
|
||||
tracingInterceptor := otgrpc.OpenTracingServerInterceptor(
|
||||
// Use the globally installed tracer
|
||||
opentracing.GlobalTracer(),
|
||||
// Log full payloads along with trace spans
|
||||
otgrpc.LogPayloads(),
|
||||
)
|
||||
if parentSpan == nil {
|
||||
return tracingInterceptor
|
||||
}
|
||||
spanContext := parentSpan.Context()
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(nil)
|
||||
}
|
||||
carrier := metadataReaderWriter{md}
|
||||
_, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, carrier)
|
||||
if err == opentracing.ErrSpanContextNotFound {
|
||||
contract.IgnoreError(opentracing.GlobalTracer().Inject(spanContext, opentracing.HTTPHeaders, carrier))
|
||||
}
|
||||
return tracingInterceptor(ctx, req, info, handler)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// OpenTracingClientInterceptor provides a default gRPC client interceptor for emitting tracing to the global
|
||||
|
@ -39,5 +91,8 @@ func OpenTracingClientInterceptor() grpc.UnaryClientInterceptor {
|
|||
opentracing.GlobalTracer(),
|
||||
// Log full payloads along with trace spans
|
||||
otgrpc.LogPayloads(),
|
||||
)
|
||||
// Do not trace calls to the empty method
|
||||
otgrpc.IncludingSpans(func(_ opentracing.SpanContext, method string, _, _ interface{}) bool {
|
||||
return method != ""
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
@ -36,7 +37,9 @@ func IsBenignCloseErr(err error) bool {
|
|||
// the port number. The return values are: the chosen port (the same as supplied if non-0), a channel that may
|
||||
// eventually return an error, and an error, in case something went wrong. The channel is non-nil and waits until
|
||||
// the server is finished, in the case of a successful launch of the RPC server.
|
||||
func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error) (int, chan error, error) {
|
||||
func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error,
|
||||
parentSpan opentracing.Span) (int, chan error, error) {
|
||||
|
||||
// Listen on a TCP port, but let the kernel choose a free port for us.
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port))
|
||||
if err != nil {
|
||||
|
@ -44,7 +47,7 @@ func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error) (in
|
|||
}
|
||||
|
||||
// Now new up a gRPC server and register any RPC interfaces the caller wants.
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(OpenTracingServerInterceptor()))
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(OpenTracingServerInterceptor(parentSpan)))
|
||||
for _, register := range registers {
|
||||
if err := register(srv); err != nil {
|
||||
return port, nil, errors.Errorf("failed to register RPC handler: %v", err)
|
||||
|
|
|
@ -59,7 +59,7 @@ func main() {
|
|||
pulumirpc.RegisterLanguageRuntimeServer(srv, host)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server"))
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ func main() {
|
|||
pulumirpc.RegisterLanguageRuntimeServer(srv, host)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server"))
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ func main() {
|
|||
pulumirpc.RegisterLanguageRuntimeServer(srv, host)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server"))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue