Compare commits

...

4 commits

Author SHA1 Message Date
Chris Smith 7d2c9ab5ce Remove Sames 2018-11-05 12:10:05 -08:00
Chris Smith 7ed385b45b Simplify API types 2018-11-04 19:31:03 -08:00
Chris Smith 56a58dba29 WIP - Persisting engine events as part of Pulumi API 2018-11-04 19:28:55 -08:00
Chris Smith afd5391551 API types for recording engine events 2018-11-02 19:32:06 -07:00
6 changed files with 215 additions and 23 deletions

View file

@ -18,6 +18,7 @@ import (
"encoding/json"
"github.com/pulumi/pulumi/pkg/diag/colors"
"github.com/pulumi/pulumi/pkg/resource"
)
// CreateUpdateConfig describes the configuration data for an request to `POST /updates`.
@ -209,3 +210,60 @@ type AppendUpdateLogEntryRequest struct {
Kind string `json:"kind"`
Fields map[string]interface{} `json:"fields"`
}
// UpdateEngineEvent describes any meaningful event during an update. It is a discriminated union
// with exactly one field being non-nil. These generally map to individual engine.Event structs.
type UpdateEngineEvent struct {
ResourceOpStarted *ResourceOpStartedEvent `json:"resourceOpStarted,omitempty"`
ResourceOpFinished *ResourceOpFinishedEvent `json:"resourceOpFinished,omitempty"`
ResourceOpFailed *ResourceOpFailedEvent `json:"resourceOpFailed,omitempty"`
Summary *UpdateSummaryEvent `json:"summary,omitempty"`
}
// ResourceOpStartedEvent is fired just before a resource-based operation is about to start.
type ResourceOpStartedEvent struct {
URN string `json:"urn"`
Type string `json:"type"`
Operation string `json:"operation"`
}
// ResourcePropertyChange describes a resource property changed as the result of an update.
// A nil old or new means the resource property was created or deleted.
type ResourcePropertyChange struct {
Property string `json:"property"`
OldValue *string `json:"oldValue,omitempty"`
NewValue *string `json:"newValue,omitempty"`
}
// ResourceOpFinishedEvent is fired when a resource-based operation successfully completes.
type ResourceOpFinishedEvent struct {
URN string `json:"urn"`
Type string `json:"type"`
Operation string `json:"operation"`
// Changes that occurred as part of the operation, nil for creates/deletes.
// TODO: Replace this with a stable type, since as-is changes to the resource
// package would break API compatibility with the service. (It may not understand
// newer/older representations of the ObjectDiff.)
Changes *resource.ObjectDiff `json:"changes,omitempty"`
}
// ResourceOpFailedEvent is fired when a resource-based operation
type ResourceOpFailedEvent struct {
URN string `json:"urn"`
Type string `json:"type"`
Operation string `json:"operation"`
}
// UpdateSummaryEvent is emitted at the end of an update.
type UpdateSummaryEvent struct {
// Duration is the number of seconds the update was executing.
Duration int `json:"duration"`
// ResourceChanges contains the count for resource change by type. The keys are deploy.StepOp,
// which is intentionally not exported with this API.
ResourceChanges map[string]int `json:"resourceChanges"`
}

View file

@ -732,24 +732,27 @@ func (b *cloudBackend) runEngineAction(
return nil, err
}
// The backend.SnapshotManager and backend.SnapshotPersister will keep track of any changes to
// the Snapshot (checkpoint file) in the HTTP backend.
persister := b.newSnapshotPersister(ctx, u.update, u.tokenSource)
manager := backend.NewSnapshotManager(persister, u.GetTarget().Snapshot)
snapshotManager := backend.NewSnapshotManager(persister, u.GetTarget().Snapshot)
// The engineEvents channel receives all events from the engine, which we then forward onto other
// channels for actual processing.
engineEvents := make(chan engine.Event)
eventsDone := make(chan bool)
// displayEvents renders the event to the console and Pulumi service.
displayEvents := make(chan engine.Event)
displayDone := make(chan bool)
go u.RecordAndDisplayEvents(
backend.ActionLabel(kind, dryRun), kind, stackRef, op,
displayEvents, displayDone, op.Opts.Display, dryRun)
engineEvents := make(chan engine.Event)
scope := op.Scopes.NewScope(engineEvents, dryRun)
eventsDone := make(chan bool)
// Main engineEvents loop.
go func() {
// Pull in all events from the engine and send to them to the two listeners.
for e := range engineEvents {
displayEvents <- e
if callerEventsOpt != nil {
callerEventsOpt <- e
}
@ -760,7 +763,12 @@ func (b *cloudBackend) runEngineAction(
// Depending on the action, kick off the relevant engine activity. Note that we don't immediately check and
// return error conditions, because we will do so below after waiting for the display channels to close.
engineCtx := &engine.Context{Cancel: scope.Context(), Events: engineEvents, SnapshotManager: manager}
cancellationScope := op.Scopes.NewScope(engineEvents, dryRun)
engineCtx := &engine.Context{
Cancel: cancellationScope.Context(),
Events: engineEvents,
SnapshotManager: snapshotManager,
}
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
engineCtx.ParentSpan = parentSpan.Context()
}
@ -779,23 +787,23 @@ func (b *cloudBackend) runEngineAction(
contract.Failf("Unrecognized update kind: %s", kind)
}
// Wait for the display to finish showing all the events.
// Wait for dependent channels to finish processing engineEvents before closing.
<-displayDone
scope.Close() // Don't take any cancellations anymore, we're shutting down.
cancellationScope.Close() // Don't take any cancellations anymore, we're shutting down.
close(engineEvents)
close(displayDone)
contract.IgnoreClose(manager)
contract.IgnoreClose(snapshotManager)
// Make sure that the goroutine writing to displayEvents and callerEventsOpt
// has exited before proceeding
<-eventsDone
close(displayEvents)
// Mark the update as complete.
status := apitype.UpdateStatusSucceeded
if err != nil {
status = apitype.UpdateStatusFailed
}
completeErr := u.Complete(status)
if completeErr != nil {
err = multierror.Append(err, errors.Wrap(completeErr, "failed to complete update"))

View file

@ -519,3 +519,11 @@ func (pc *Client) AppendUpdateLogEntry(ctx context.Context, update UpdateIdentif
return pc.updateRESTCall(ctx, "POST", getUpdatePath(update, "log"), nil, req, nil,
updateAccessToken(token), httpCallOptions{RetryAllMethods: true})
}
// RecordEngineEvent posts an engine event to the Pulumi service.
func (pc *Client) RecordEngineEvent(ctx context.Context, update UpdateIdentifier, event apitype.UpdateEngineEvent, token string) error {
return pc.updateRESTCall(
ctx, "POST", getUpdatePath(update, "events"),
nil, event, nil,
updateAccessToken(token), httpCallOptions{GzipCompress: true})
}

View file

@ -19,6 +19,8 @@ import (
"fmt"
"time"
"github.com/pulumi/pulumi/pkg/util/contract"
"github.com/pkg/errors"
"github.com/pulumi/pulumi/pkg/apitype"
"github.com/pulumi/pulumi/pkg/backend"
@ -136,15 +138,31 @@ func (u *cloudUpdate) Complete(status apitype.UpdateStatus) error {
return u.backend.client.CompleteUpdate(u.context, u.update, status, token)
}
// recordEvent will record the event with the Pulumi service, enabling things like viewing
// the rendered update logs or drilling into the specific changes made during an update.
func (u *cloudUpdate) recordEvent(
action apitype.UpdateKind, event engine.Event, seen map[resource.URN]engine.StepEventMetadata,
opts display.Options) error {
// If we don't have a token source, we can't perform any mutations.
if u.tokenSource == nil {
return nil
contract.Assert(u.tokenSource != nil)
token, err := u.tokenSource.GetToken()
if err != nil {
return err
}
// We emit the event data in two ways to the Pulumi service. First, we distill it down into
// a summary "UpdateEngineEvent". We also run the event through the display package to render
// and upload the full update log.
updateEvent, convErr := convertEngineEvent(event)
if convErr != nil {
return errors.Wrap(convErr, "converting engine event")
}
if updateEvent != nil {
if err = u.backend.client.RecordEngineEvent(u.context, u.update, *updateEvent, token); err != nil {
return err
}
}
// The next place we upload the event runs it through the diff view, and then uploads the rendered log.
fields := make(map[string]interface{})
kind := string(apitype.StdoutEvent)
if event.Type == engine.DiagEvent {
@ -169,11 +187,6 @@ func (u *cloudUpdate) recordEvent(
}
msg = msg[len(chunk):]
token, err := u.tokenSource.GetToken()
if err != nil {
return err
}
fields["text"] = chunk
fields["colorize"] = colors.Always
if err = u.backend.client.AppendUpdateLogEntry(u.context, u.update, kind, fields, token); err != nil {
@ -183,6 +196,8 @@ func (u *cloudUpdate) recordEvent(
return nil
}
// RecordAndDisplayEvents inspects engine events from the given channel, and prints them to the CLI as well as
// posting them to the Pulumi service.
func (u *cloudUpdate) RecordAndDisplayEvents(
label string, action apitype.UpdateKind, stackRef backend.StackReference, op backend.UpdateOperation,
events <-chan engine.Event, done chan<- bool, opts display.Options, isPreview bool) {
@ -210,7 +225,7 @@ func (u *cloudUpdate) RecordAndDisplayEvents(
}
if e.Type == engine.CancelEvent {
return
break
}
}
}
@ -292,3 +307,82 @@ func (b *cloudBackend) getTarget(ctx context.Context, stackRef backend.StackRefe
Snapshot: snapshot,
}, nil
}
// convertEngineEvent will try to convert the engine event to an event to send to the
// Pulumi service. Returns nil, nil if the event has no analog to be converted. Or
// nil, error if the engine event has an unknown type.
func convertEngineEvent(e engine.Event) (*apitype.UpdateEngineEvent, error) {
var updateEvent apitype.UpdateEngineEvent
// Error to return if the payload doesn't match expected.
eventTypePayloadMismatch := errors.Errorf("unexpected payload for event type %v", e.Type)
switch e.Type {
case engine.SummaryEvent:
p, ok := e.Payload.(engine.SummaryEventPayload)
if !ok {
return nil, eventTypePayloadMismatch
}
// Convert the resource changes.
changes := make(map[string]int)
for op, count := range p.ResourceChanges {
changes[string(op)] = count
}
updateEvent.Summary = &apitype.UpdateSummaryEvent{
Duration: int(p.Duration.Seconds()),
ResourceChanges: changes,
}
case engine.ResourcePreEvent:
p, ok := e.Payload.(engine.ResourcePreEventPayload)
if !ok {
return nil, eventTypePayloadMismatch
}
updateEvent.ResourceOpStarted = &apitype.ResourceOpStartedEvent{
URN: string(p.Metadata.URN),
Type: string(p.Metadata.Type),
Operation: string(p.Metadata.Op),
}
case engine.ResourceOutputsEvent:
p, ok := e.Payload.(engine.ResourceOutputsEventPayload)
if !ok {
return nil, eventTypePayloadMismatch
}
var changes *resource.ObjectDiff
m := p.Metadata
if m.Old != nil && m.New != nil && m.Old.Outputs != nil && m.New.Outputs != nil {
changes = m.Old.Outputs.Diff(m.New.Outputs)
// Don't send data to the service we won't endup rendering.
changes.RemoveSames()
}
updateEvent.ResourceOpFinished = &apitype.ResourceOpFinishedEvent{
URN: string(p.Metadata.URN),
Type: string(p.Metadata.Type),
Operation: string(p.Metadata.Op),
Changes: changes,
}
case engine.ResourceOperationFailed:
p, ok := e.Payload.(engine.ResourceOperationFailedPayload)
if !ok {
return nil, eventTypePayloadMismatch
}
updateEvent.ResourceOpFailed = &apitype.ResourceOpFailedEvent{
URN: string(p.Metadata.URN),
Type: string(p.Metadata.Type),
Operation: string(p.Metadata.Op),
}
// Remaining event types are ignored, and error on any unexpected ones.
case engine.CancelEvent, engine.StdoutColorEvent, engine.DiagEvent:
return nil, nil
default:
return nil, errors.Errorf("unknown event type %q", e.Type)
}
return &updateEvent, nil
}

View file

@ -74,6 +74,7 @@ type PreludeEventPayload struct {
Config map[string]string // the keys and values for config. For encrypted config, the values may be blinded
}
// SummaryEventPayload is the payload for events with the SummaryEvent type.
type SummaryEventPayload struct {
IsPreview bool // true if this summary is for a plan operation
MaybeCorrupt bool // true if one or more resources may be corrupt

View file

@ -74,6 +74,29 @@ func (diff *ObjectDiff) Keys() []PropertyKey {
return ks
}
// RemoveSames removes all references to "same" property values.
func (diff *ObjectDiff) RemoveSames() {
removeSamesFromValueDiff := func(vd *ValueDiff) {
if arrayDiff := vd.Array; arrayDiff != nil {
arrayDiff.Sames = nil
for k, v := range arrayDiff.Updates {
removeSamesFromValueDiff(&v)
}
}
if objDiff := vd.Object; objDiff != nil {
objDiff.Sames = nil
for k, v := range objDiff.Updates {
removeSamesFromValueDiff(&v)
}
}
}
diff.Sames = nil
for k, v := range diff.Updates {
removeSamesFromValueDiff(&v)
}
}
// ValueDiff holds the results of diffing two property values.
type ValueDiff struct {
Old PropertyValue // the old value.