Compare commits
4 commits
master
...
features/p
Author | SHA1 | Date | |
---|---|---|---|
|
7d2c9ab5ce | ||
|
7ed385b45b | ||
|
56a58dba29 | ||
|
afd5391551 |
|
@ -18,6 +18,7 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/diag/colors"
|
||||
"github.com/pulumi/pulumi/pkg/resource"
|
||||
)
|
||||
|
||||
// CreateUpdateConfig describes the configuration data for an request to `POST /updates`.
|
||||
|
@ -209,3 +210,60 @@ type AppendUpdateLogEntryRequest struct {
|
|||
Kind string `json:"kind"`
|
||||
Fields map[string]interface{} `json:"fields"`
|
||||
}
|
||||
|
||||
// UpdateEngineEvent describes any meaningful event during an update. It is a discriminated union
|
||||
// with exactly one field being non-nil. These generally map to individual engine.Event structs.
|
||||
type UpdateEngineEvent struct {
|
||||
ResourceOpStarted *ResourceOpStartedEvent `json:"resourceOpStarted,omitempty"`
|
||||
ResourceOpFinished *ResourceOpFinishedEvent `json:"resourceOpFinished,omitempty"`
|
||||
ResourceOpFailed *ResourceOpFailedEvent `json:"resourceOpFailed,omitempty"`
|
||||
|
||||
Summary *UpdateSummaryEvent `json:"summary,omitempty"`
|
||||
}
|
||||
|
||||
// ResourceOpStartedEvent is fired just before a resource-based operation is about to start.
|
||||
type ResourceOpStartedEvent struct {
|
||||
URN string `json:"urn"`
|
||||
Type string `json:"type"`
|
||||
|
||||
Operation string `json:"operation"`
|
||||
}
|
||||
|
||||
// ResourcePropertyChange describes a resource property changed as the result of an update.
|
||||
// A nil old or new means the resource property was created or deleted.
|
||||
type ResourcePropertyChange struct {
|
||||
Property string `json:"property"`
|
||||
OldValue *string `json:"oldValue,omitempty"`
|
||||
NewValue *string `json:"newValue,omitempty"`
|
||||
}
|
||||
|
||||
// ResourceOpFinishedEvent is fired when a resource-based operation successfully completes.
|
||||
type ResourceOpFinishedEvent struct {
|
||||
URN string `json:"urn"`
|
||||
Type string `json:"type"`
|
||||
|
||||
Operation string `json:"operation"`
|
||||
|
||||
// Changes that occurred as part of the operation, nil for creates/deletes.
|
||||
// TODO: Replace this with a stable type, since as-is changes to the resource
|
||||
// package would break API compatibility with the service. (It may not understand
|
||||
// newer/older representations of the ObjectDiff.)
|
||||
Changes *resource.ObjectDiff `json:"changes,omitempty"`
|
||||
}
|
||||
|
||||
// ResourceOpFailedEvent is fired when a resource-based operation
|
||||
type ResourceOpFailedEvent struct {
|
||||
URN string `json:"urn"`
|
||||
Type string `json:"type"`
|
||||
|
||||
Operation string `json:"operation"`
|
||||
}
|
||||
|
||||
// UpdateSummaryEvent is emitted at the end of an update.
|
||||
type UpdateSummaryEvent struct {
|
||||
// Duration is the number of seconds the update was executing.
|
||||
Duration int `json:"duration"`
|
||||
// ResourceChanges contains the count for resource change by type. The keys are deploy.StepOp,
|
||||
// which is intentionally not exported with this API.
|
||||
ResourceChanges map[string]int `json:"resourceChanges"`
|
||||
}
|
||||
|
|
|
@ -732,24 +732,27 @@ func (b *cloudBackend) runEngineAction(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// The backend.SnapshotManager and backend.SnapshotPersister will keep track of any changes to
|
||||
// the Snapshot (checkpoint file) in the HTTP backend.
|
||||
persister := b.newSnapshotPersister(ctx, u.update, u.tokenSource)
|
||||
manager := backend.NewSnapshotManager(persister, u.GetTarget().Snapshot)
|
||||
snapshotManager := backend.NewSnapshotManager(persister, u.GetTarget().Snapshot)
|
||||
|
||||
// The engineEvents channel receives all events from the engine, which we then forward onto other
|
||||
// channels for actual processing.
|
||||
engineEvents := make(chan engine.Event)
|
||||
eventsDone := make(chan bool)
|
||||
|
||||
// displayEvents renders the event to the console and Pulumi service.
|
||||
displayEvents := make(chan engine.Event)
|
||||
displayDone := make(chan bool)
|
||||
|
||||
go u.RecordAndDisplayEvents(
|
||||
backend.ActionLabel(kind, dryRun), kind, stackRef, op,
|
||||
displayEvents, displayDone, op.Opts.Display, dryRun)
|
||||
|
||||
engineEvents := make(chan engine.Event)
|
||||
|
||||
scope := op.Scopes.NewScope(engineEvents, dryRun)
|
||||
eventsDone := make(chan bool)
|
||||
// Main engineEvents loop.
|
||||
go func() {
|
||||
// Pull in all events from the engine and send to them to the two listeners.
|
||||
for e := range engineEvents {
|
||||
displayEvents <- e
|
||||
|
||||
if callerEventsOpt != nil {
|
||||
callerEventsOpt <- e
|
||||
}
|
||||
|
@ -760,7 +763,12 @@ func (b *cloudBackend) runEngineAction(
|
|||
|
||||
// Depending on the action, kick off the relevant engine activity. Note that we don't immediately check and
|
||||
// return error conditions, because we will do so below after waiting for the display channels to close.
|
||||
engineCtx := &engine.Context{Cancel: scope.Context(), Events: engineEvents, SnapshotManager: manager}
|
||||
cancellationScope := op.Scopes.NewScope(engineEvents, dryRun)
|
||||
engineCtx := &engine.Context{
|
||||
Cancel: cancellationScope.Context(),
|
||||
Events: engineEvents,
|
||||
SnapshotManager: snapshotManager,
|
||||
}
|
||||
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
||||
engineCtx.ParentSpan = parentSpan.Context()
|
||||
}
|
||||
|
@ -779,23 +787,23 @@ func (b *cloudBackend) runEngineAction(
|
|||
contract.Failf("Unrecognized update kind: %s", kind)
|
||||
}
|
||||
|
||||
// Wait for the display to finish showing all the events.
|
||||
// Wait for dependent channels to finish processing engineEvents before closing.
|
||||
<-displayDone
|
||||
scope.Close() // Don't take any cancellations anymore, we're shutting down.
|
||||
cancellationScope.Close() // Don't take any cancellations anymore, we're shutting down.
|
||||
close(engineEvents)
|
||||
close(displayDone)
|
||||
contract.IgnoreClose(manager)
|
||||
contract.IgnoreClose(snapshotManager)
|
||||
|
||||
// Make sure that the goroutine writing to displayEvents and callerEventsOpt
|
||||
// has exited before proceeding
|
||||
<-eventsDone
|
||||
close(displayEvents)
|
||||
|
||||
// Mark the update as complete.
|
||||
status := apitype.UpdateStatusSucceeded
|
||||
if err != nil {
|
||||
status = apitype.UpdateStatusFailed
|
||||
}
|
||||
|
||||
completeErr := u.Complete(status)
|
||||
if completeErr != nil {
|
||||
err = multierror.Append(err, errors.Wrap(completeErr, "failed to complete update"))
|
||||
|
|
|
@ -519,3 +519,11 @@ func (pc *Client) AppendUpdateLogEntry(ctx context.Context, update UpdateIdentif
|
|||
return pc.updateRESTCall(ctx, "POST", getUpdatePath(update, "log"), nil, req, nil,
|
||||
updateAccessToken(token), httpCallOptions{RetryAllMethods: true})
|
||||
}
|
||||
|
||||
// RecordEngineEvent posts an engine event to the Pulumi service.
|
||||
func (pc *Client) RecordEngineEvent(ctx context.Context, update UpdateIdentifier, event apitype.UpdateEngineEvent, token string) error {
|
||||
return pc.updateRESTCall(
|
||||
ctx, "POST", getUpdatePath(update, "events"),
|
||||
nil, event, nil,
|
||||
updateAccessToken(token), httpCallOptions{GzipCompress: true})
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pulumi/pulumi/pkg/apitype"
|
||||
"github.com/pulumi/pulumi/pkg/backend"
|
||||
|
@ -136,15 +138,31 @@ func (u *cloudUpdate) Complete(status apitype.UpdateStatus) error {
|
|||
return u.backend.client.CompleteUpdate(u.context, u.update, status, token)
|
||||
}
|
||||
|
||||
// recordEvent will record the event with the Pulumi service, enabling things like viewing
|
||||
// the rendered update logs or drilling into the specific changes made during an update.
|
||||
func (u *cloudUpdate) recordEvent(
|
||||
action apitype.UpdateKind, event engine.Event, seen map[resource.URN]engine.StepEventMetadata,
|
||||
opts display.Options) error {
|
||||
|
||||
// If we don't have a token source, we can't perform any mutations.
|
||||
if u.tokenSource == nil {
|
||||
return nil
|
||||
contract.Assert(u.tokenSource != nil)
|
||||
token, err := u.tokenSource.GetToken()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We emit the event data in two ways to the Pulumi service. First, we distill it down into
|
||||
// a summary "UpdateEngineEvent". We also run the event through the display package to render
|
||||
// and upload the full update log.
|
||||
updateEvent, convErr := convertEngineEvent(event)
|
||||
if convErr != nil {
|
||||
return errors.Wrap(convErr, "converting engine event")
|
||||
}
|
||||
if updateEvent != nil {
|
||||
if err = u.backend.client.RecordEngineEvent(u.context, u.update, *updateEvent, token); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// The next place we upload the event runs it through the diff view, and then uploads the rendered log.
|
||||
fields := make(map[string]interface{})
|
||||
kind := string(apitype.StdoutEvent)
|
||||
if event.Type == engine.DiagEvent {
|
||||
|
@ -169,11 +187,6 @@ func (u *cloudUpdate) recordEvent(
|
|||
}
|
||||
msg = msg[len(chunk):]
|
||||
|
||||
token, err := u.tokenSource.GetToken()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fields["text"] = chunk
|
||||
fields["colorize"] = colors.Always
|
||||
if err = u.backend.client.AppendUpdateLogEntry(u.context, u.update, kind, fields, token); err != nil {
|
||||
|
@ -183,6 +196,8 @@ func (u *cloudUpdate) recordEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
// RecordAndDisplayEvents inspects engine events from the given channel, and prints them to the CLI as well as
|
||||
// posting them to the Pulumi service.
|
||||
func (u *cloudUpdate) RecordAndDisplayEvents(
|
||||
label string, action apitype.UpdateKind, stackRef backend.StackReference, op backend.UpdateOperation,
|
||||
events <-chan engine.Event, done chan<- bool, opts display.Options, isPreview bool) {
|
||||
|
@ -210,7 +225,7 @@ func (u *cloudUpdate) RecordAndDisplayEvents(
|
|||
}
|
||||
|
||||
if e.Type == engine.CancelEvent {
|
||||
return
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -292,3 +307,82 @@ func (b *cloudBackend) getTarget(ctx context.Context, stackRef backend.StackRefe
|
|||
Snapshot: snapshot,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// convertEngineEvent will try to convert the engine event to an event to send to the
|
||||
// Pulumi service. Returns nil, nil if the event has no analog to be converted. Or
|
||||
// nil, error if the engine event has an unknown type.
|
||||
func convertEngineEvent(e engine.Event) (*apitype.UpdateEngineEvent, error) {
|
||||
var updateEvent apitype.UpdateEngineEvent
|
||||
|
||||
// Error to return if the payload doesn't match expected.
|
||||
eventTypePayloadMismatch := errors.Errorf("unexpected payload for event type %v", e.Type)
|
||||
|
||||
switch e.Type {
|
||||
case engine.SummaryEvent:
|
||||
p, ok := e.Payload.(engine.SummaryEventPayload)
|
||||
if !ok {
|
||||
return nil, eventTypePayloadMismatch
|
||||
}
|
||||
// Convert the resource changes.
|
||||
changes := make(map[string]int)
|
||||
for op, count := range p.ResourceChanges {
|
||||
changes[string(op)] = count
|
||||
}
|
||||
updateEvent.Summary = &apitype.UpdateSummaryEvent{
|
||||
Duration: int(p.Duration.Seconds()),
|
||||
ResourceChanges: changes,
|
||||
}
|
||||
|
||||
case engine.ResourcePreEvent:
|
||||
p, ok := e.Payload.(engine.ResourcePreEventPayload)
|
||||
if !ok {
|
||||
return nil, eventTypePayloadMismatch
|
||||
}
|
||||
updateEvent.ResourceOpStarted = &apitype.ResourceOpStartedEvent{
|
||||
URN: string(p.Metadata.URN),
|
||||
Type: string(p.Metadata.Type),
|
||||
Operation: string(p.Metadata.Op),
|
||||
}
|
||||
|
||||
case engine.ResourceOutputsEvent:
|
||||
p, ok := e.Payload.(engine.ResourceOutputsEventPayload)
|
||||
if !ok {
|
||||
return nil, eventTypePayloadMismatch
|
||||
}
|
||||
|
||||
var changes *resource.ObjectDiff
|
||||
m := p.Metadata
|
||||
if m.Old != nil && m.New != nil && m.Old.Outputs != nil && m.New.Outputs != nil {
|
||||
changes = m.Old.Outputs.Diff(m.New.Outputs)
|
||||
// Don't send data to the service we won't endup rendering.
|
||||
changes.RemoveSames()
|
||||
}
|
||||
|
||||
updateEvent.ResourceOpFinished = &apitype.ResourceOpFinishedEvent{
|
||||
URN: string(p.Metadata.URN),
|
||||
Type: string(p.Metadata.Type),
|
||||
Operation: string(p.Metadata.Op),
|
||||
|
||||
Changes: changes,
|
||||
}
|
||||
|
||||
case engine.ResourceOperationFailed:
|
||||
p, ok := e.Payload.(engine.ResourceOperationFailedPayload)
|
||||
if !ok {
|
||||
return nil, eventTypePayloadMismatch
|
||||
}
|
||||
updateEvent.ResourceOpFailed = &apitype.ResourceOpFailedEvent{
|
||||
URN: string(p.Metadata.URN),
|
||||
Type: string(p.Metadata.Type),
|
||||
Operation: string(p.Metadata.Op),
|
||||
}
|
||||
|
||||
// Remaining event types are ignored, and error on any unexpected ones.
|
||||
case engine.CancelEvent, engine.StdoutColorEvent, engine.DiagEvent:
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, errors.Errorf("unknown event type %q", e.Type)
|
||||
}
|
||||
|
||||
return &updateEvent, nil
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ type PreludeEventPayload struct {
|
|||
Config map[string]string // the keys and values for config. For encrypted config, the values may be blinded
|
||||
}
|
||||
|
||||
// SummaryEventPayload is the payload for events with the SummaryEvent type.
|
||||
type SummaryEventPayload struct {
|
||||
IsPreview bool // true if this summary is for a plan operation
|
||||
MaybeCorrupt bool // true if one or more resources may be corrupt
|
||||
|
|
|
@ -74,6 +74,29 @@ func (diff *ObjectDiff) Keys() []PropertyKey {
|
|||
return ks
|
||||
}
|
||||
|
||||
// RemoveSames removes all references to "same" property values.
|
||||
func (diff *ObjectDiff) RemoveSames() {
|
||||
removeSamesFromValueDiff := func(vd *ValueDiff) {
|
||||
if arrayDiff := vd.Array; arrayDiff != nil {
|
||||
arrayDiff.Sames = nil
|
||||
for k, v := range arrayDiff.Updates {
|
||||
removeSamesFromValueDiff(&v)
|
||||
}
|
||||
}
|
||||
if objDiff := vd.Object; objDiff != nil {
|
||||
objDiff.Sames = nil
|
||||
for k, v := range objDiff.Updates {
|
||||
removeSamesFromValueDiff(&v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diff.Sames = nil
|
||||
for k, v := range diff.Updates {
|
||||
removeSamesFromValueDiff(&v)
|
||||
}
|
||||
}
|
||||
|
||||
// ValueDiff holds the results of diffing two property values.
|
||||
type ValueDiff struct {
|
||||
Old PropertyValue // the old value.
|
||||
|
|
Loading…
Reference in a new issue